以5个数据库为例,用Python实现数据的提取、转换和加载(ETL)
導讀:每個數據科學專業人員都必須從不同的數據源中提取、轉換和加載(Extract-Transform-Load,ETL)數據。
本文將討論如何使用Python為選定的流行數據庫實現數據的ETL。對于關系數據庫,選擇MySQL,并將Elasticsearch作為文檔數據庫的例子展開。對于圖形數據庫,選擇Neo4j。對于NoSQL,可參考此前文章中介紹的MongoDB。
作者:薩揚·穆霍帕迪亞(Sayan Mukhopadhyay)
如需轉載請聯系大數據(ID:hzdashuju)
ElasticSearch是一個基于Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java開發的,并作為Apache許可條款下的開放源碼發布,是當前流行的企業級搜索引擎。
Neo4j是一個高性能的,NOSQL圖形數據庫,它將結構化數據存儲在網絡上(從數學角度叫做圖)而不是表中,是一個嵌入式的、基于磁盤的、具備完全的事務特性的Java持久化引擎。
01 MySQL
MySQLdb是在MySQL C接口上面開發的Python API。
1. 如何安裝MySQLdb
首先,需要在計算機上安裝Python MySQLdb模塊。然后運行以下腳本:
import?MySQLdb
如果出現導入錯誤,則表示模塊未正確安裝。
以下是安裝MySQL Python模塊的說明:
$tar?–xvf?MySQL-python-1.2.2.tar
$cd?MySQL-python-1.2.2
$python?setup.py?build
$python?setup.py?install
2. 數據庫連接
在連接到MySQL數據庫之前,請確保有以下內容。
有一個名為TEST的數據庫。
在TEST數據庫中有一個表STUDENT。
STUDENT表有三個字段:NAME、SUR_NAME和ROLL_NO。
用戶對TEST數據庫具有完全訪問權限。
3. INSERT操作
以下代碼執行SQL INSERT語句,以便在STUDENT表中創建記錄:
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","user","passwd","TEST"?)
#?prepare?a?cursor?object?using?cursor()?method
cursor?=?db.cursor()
#?Prepare?SQL?query?to?INSERT?a?record?into?the?database.
sql?=?"""INSERT?INTO?STUDENT(NAME,
?????????SUR_NAME,?ROLL_NO)
?????????VALUES?('Sayan',?'Mukhopadhyay',?1)"""
try:
???#?Execute?the?SQL?command
cursor.execute(sql)
???#?Commit?your?changes?in?the?database
???db.commit()
except:
???#?Rollback?in?case?there?is?any?error
???db.rollback()
#?disconnect?from?server
db.close()
4. READ操作
以下代碼從STUDENT表中提取數據并打印出來:
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","user","passwd","TEST"?)
#?prepare?a?cursor?object?using?cursor()?method
cursor?=?db.cursor()
#?Prepare?SQL?query?to?INSERT?a?record?into?the?database.
sql?=?"SELECT?*?FROM?STUDENT?"
try:
???#?Execute?the?SQL?command
cursor.execute(sql)
???#?Fetch?all?the?rows?in?a?list?of?lists.
results?=?cursor.fetchall()
for?row?in?results:
fname?=?row[0]
lname?=?row[1]
id?=?row[2]
??????#?Now?print?fetched?result
print?"name=%s,surname=%s,id=%d"?%?\
?????????????(fname,?lname,?id?)
except:
print?"Error:?unable?to?fecth?data"
#?disconnect?from?server
db.close()
5. DELETE操作
以下代碼從TEST中刪除id=1的一行數據:
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","test","passwd","TEST")
#prepare?a?cursor?object?using?cursor()?method
cursor?=?db.cursor()
#?PrepareSQL?query?to?DELETE?required?records
sql="DELETE?FROM?STUDENT?WHERE?ROLL_NO=1"
try:
#Execute?the?SQL?command?
cursor.execute(sql)
#Commit?your?changes?in?the?database
db.commit()
except:
#Roll?back?in?case?there?is?any?error
db.rollback()
#disconnect?from?server?
db.close()
6. UPDATE操作
以下代碼將lastname為Mukhopadhyay的記錄更改為Mukherjee:
import?MySQLdb
#?Open?database?connection
db?=?MySQLdb.connect("localhost","user","passwd","TEST"?)
#?prepare?a?cursor?object?using
cursor()?method?cursor?=?db.cursor()
#?Prepare?SQL?query?to?UPDATE?required?records
sql?=?"UPDATE?STUDENT?SET?SUR_NAME="Mukherjee"
??????????????????????????WHERE?SUR_NAME="Mukhopadhyay""
try:
???#?Execute?the?SQL?command
cursor.execute(sql)
???#?Commit?your?changes?in?the?database
db.commit()
except:
???#?Rollback?in?case?there?is?any?error
db.rollback()
#?disconnect?from?server
db.close()
7. COMMIT操作
提交操作提供對數據庫完成修改命令,并且在此操作之后,無法將其還原。
8. ROLL-BACK操作
如果不能確認對數據的修改同時想要撤回操作,可以使用roll-back()方法。
以下是通過Python訪問MySQL數據的完整示例。它將提供將數據存儲為CSV文件或MySQL數據庫中的數據的完整描述。
import?sys
out?=?open('Config1.txt','w')
print?"Enter?the?Data?Source?Type:"
print?"1.?MySql"
print?"2.?Text"
print?"3.?Exit"
while(1):
???????data1?=?sys.stdin.readline().strip()
???????if(int(data1)?==?1):
?????????????out.write("source?begin"+"\n"+"type=mysql\n")
?????????????print?"Enter?the?ip:"
?????????????ip?=?sys.stdin.readline().strip()
?????????????out.write("host="?+?ip?+?"\n")
?????????????print?"Enter?the?database?name:"
?????????????db?=?sys.stdin.readline().strip()
?????????????out.write("database="?+?db?+?"\n")
?????????????print?"Enter?the?user?name:"
?????????????usr?=?sys.stdin.readline().strip()
?????????????out.write("user="?+?usr?+?"\n")
?????????????print?"Enter?the?password:"
?????????????passwd?=?sys.stdin.readline().strip()
?????????????out.write("password="?+?passwd?+?"\n")
?????????????connection?=?MySQLdb.connect(ip,?usr,?passwd,?db)
?????????????cursor?=?connection.cursor()
?????????????query?=?"show?tables"
?????????????cursor.execute(query)
?????????????data?=?cursor.fetchall()
?????????????tables?=?[]
?????????????for?row?in?data:
????????????????????for?field?in?row:
???????????????????????????tables.append(field.strip())
?????????????for?i?in?range(len(tables)):
????????????????????print?i,?tables[i]
?????????????tb?=?tables[int(sys.stdin.readline().strip())]
?????????????out.write("table="?+?tb?+?"\n")
?????????????query?=?"describe?"?+?tb
?????????????cursor.execute(query)
?????????????data?=?cursor.fetchall()
?????????????columns?=?[]
?????????????for?row?in?data:
????????????????????columns.append(row[0].strip())
?????????????for?i?in?range(len(columns)):
????????????????????print?columns[i]?
?????????????print?"Not?index?choose?the?exact?column?names?seperated?by?coma"
?????????????cols?=?sys.stdin.readline().strip()
?????????????out.write("columns="?+?cols?+?"\n")
?????????????cursor.close()
?????????????connection.close()
?????????????out.write("source?end"+"\n")
?????????????print?"Enter?the?Data?Source?Type:"
?????????????print?"1.?MySql"
?????????????print?"2.?Text"
?????????????print?"3.?Exit"
???????if(int(data1)?==?2):
?????????????print?"path?of?text?file:"
?????????????path?=?sys.stdin.readline().strip()
?????????????file?=?open(path)
?????????????count?=?0
?????????????for?line?in?file:
????????????????????print?line
????????????????????count?=?count?+?1
????????????????????if?count?>?3:
??????????????????????????break
?????????????file.close()
?????????????out.write("source?begin"+"\n"+"type=text\n")
?????????????out.write("path="?+?path?+?"\n")
?????????????print?"enter?delimeter:"
?????????????dlm?=?sys.stdin.readline().strip()
?????????????out.write("dlm="?+?dlm?+?"\n")
?????????????print?"enter?column?indexes?seperated?by?comma:"
?????????????cols?=?sys.stdin.readline().strip()
?????????????out.write("columns="?+?cols?+?"\n")
?????????????out.write("source?end"+"\n")
?????????????print?"Enter?the?Data?Source?Type:"
?????????????print?"1.?MySql"
?????????????print?"2.?Text"
?????????????print?"3.?Exit"
???????if(int(data1)?==?3):
?????????????out.close()
?????????????sys.exit()
02 Elasticsearch
Elasticsearch(ES)低級客戶端提供從Python到ES REST端點的直接映射。Elasticsearch的一大優勢是為數據分析提供了全棧解決方案。Elasticsearch作為數據庫,有可配置前端Kibana、數據收集工具Logstash以及企業安全工具Shield。
下例具有稱為cat、cluster、indices、ingest、nodes、snapshot和tasks的特征,根據任務分別轉換為CatClient、ClusterClient、IndicesClient、IngestClient、NodesClient、SnapshotClient和TasksClient實例。這些實例是訪問這些類及其方法的唯一方式。
你可以指定自己的連接類,可以通過提供的connection_class參數來使用。
Es1=Elasticsearch(connection_class=ThriftConnection)
如果你想打開sniffing,那么有幾個選擇:
#?the?list?of?active?nodes.?Start?with?nodes?running?on?'esnode1'?and
#?'esnode2'
Es1=Elasticsearch(
????['esnode1',?'esnode2'],
#?sniff?before?doing?anything
sniff_on_start=True,
#?refresh?nodes?after?a?node?fails?to?respond
sniff_on_connection_fail=True,
#?and?also?every?30?seconds
sniffer_timeout=30
)
不同的主機可以有不同的參數,你可以為每個節點使用一個字典來指定它們。
#?and?an?url_prefix.?Note?that?``port``?needs?to?be?an?int.
Es1=Elasticsearch([
{'host':'localhost'},
{'host':'othernode','port':443,'url_prefix':'es','use_ssl':True},
])
還支持SSL客戶端身份驗證(有關選項的詳細說明,請參閱Urllib3HttpConnection)。
['localhost:443','other_host:443'],
#?turn?on?SSL
use_ssl=True,
#?make?sure?we?verify?SSL?certificates?(off?by?default)
verify_certs=True,
#?provide?a?path?to?CA?certs?on?disk
ca_certs='path?to?CA_certs',
#?PEM?formatted?SSL?client?certificate
client_cert='path?to?clientcert.pem',
#?PEM?formatted?SSL?client?key
client_key='path?to?clientkey.pem'
)
連接層API
許多類負責處理Elasticsearch集群。這里可以通過將參數傳遞給Elasticsearch類來忽略正在使用的默認子類。屬于客戶端的每個參數都將添加到Transport、ConnectionPool和Connection上。
例如,如果你要使用定制的ConnectionSelector類,只需傳入selector_class參數即可。
整個API以很高的精確度包裝了原始REST API,其中包括區分調用必需參數和可選參數。這意味著代碼區分了按排位的參數和關鍵字參數。建議讀者使用關鍵字參數來保證所有調用的一致性和安全性。
如果Elasticsearch返回2XX,則API調用成功(并將返回響應)。否則,將引發TransportError(或更具體的子類)的實例。你可以在異常中查看其他異常和錯誤狀態。如果你不希望引發異常,可以通過傳入ignore參數忽略狀態代碼或狀態代碼列表。
es=Elasticsearch()
#?ignore?400?cause?by?IndexAlreadyExistsException?when?creating?an?index
es.indices.create(index='test-index',ignore=400)
#?ignore?404?and?400
es.indices.delete(index='test-index',ignore=[400,404])
03 Neo4j Python驅動
Neo4j支持Neo4j Python驅動,并通過二進制協議與數據庫連接。它試圖保持簡約及Python的慣用方式。
from?neo4j.v1?import?GraphDatabase,?basic_auth
driver11?=?GraphDatabase.driver("bolt://localhost",?auth=basic_auth("neo4j",?"neo4j"))
session11?=?driver11.session()
session11.run("CREATE?(a:Person?{name:'Sayan',title:'Mukhopadhyay'})")
result11=?session11.run("MATCH?(a:Person)?WHERE?a.name?='Sayan'?RETURN?a.name?AS?name,?a.title?AS?title")
for?recordi?n?result11:
print("%s?%s"%?(record["title"],?record["name"]))
session11.close()
04 neo4j-rest-client
neo4j-rest-client的主要目標是確保已經使用本地Neo4j的Python程序員通過python-embedded的方式也能夠訪問Neo4j REST服務器。因此,neo4j-rest-client API的結構與python-embedded完全同步。但是引入了一種新的結構,以達到更加Python化的風格,并通過Neo4j團隊引入的新特性來增強API。
05 內存數據庫
另一個重要的數據庫類是內存數據庫。它在RAM中存儲和處理數據。因此,對數據庫的操作非常快,并且數據是靈活的。SQLite是內存數據庫的一個流行范例。在Python中,需要使用sqlalchemy庫來操作SQLite。在第1章的Flask和Falcon示例中,展示了如何從SQLite中選擇數據。以下將展示如何在SQLite中存儲Pandas數據框架:
import?sqlite3
conn?=?sqlite3.connect('multiplier.db')
conn.execute('''CREATE?TABLE?if?not?exists?multiplier
???????(domain????????CHAR(50),
????????low????????REAL,
????????high????????REAL);''')
conn.close()
db_name?=?"sqlite:///"?+?prop?+?"_"?+?domain?+?str(i)?+?".db"
disk_engine?=?create_engine(db_name)
df.to_sql('scores',?disk_engine,?if_exists='replace')
06 Python版本MongoDB
這部分內容請見此前的文章數據處理入門干貨:MongoDB和pandas極簡教程。
關于作者:Sayan Mukhopadhyay擁有超過13年的行業經驗,并與瑞信、PayPal、CA Technologies、CSC和Mphasis等公司建立了聯系。他對投資銀行、在線支付、在線廣告、IT架構和零售等領域的數據分析應用有著深刻的理解。他的專業領域是在分布式和數據驅動的環境(如實時分析、高頻交易等)中,實現高性能計算。
本文摘編自《Python高級數據分析:機器學習、深度學習和NLP實例》,經出版方授權發布。
延伸閱讀《Python高級數據分析》
點擊上圖了解及購買
轉載請聯系微信:DoctorData
推薦語:本書介紹高級數據分析概念的廣泛基礎,以及最近的數據庫革命,如Neo4j、彈性搜索和MongoDB。本書討論了如何實現包括局部爬取在內的ETL技術,并應用于高頻算法交易和目標導向的對話系統等領域。還有一些機器學習概念的例子,如半監督學習、深度學習和NLP。本書還涵蓋了重要的傳統數據分析技術,如時間序列和主成分分析等。
你想免費讀這本《Python高級數據分析:機器學習、深度學習和NLP實例》嗎?
「大數據」內容合伙人之「鑒書小分隊」上線啦!
最近,你都在讀什么書?有哪些心得體會想要跟大家分享?
數據叔最近搞了個大事——聯合優質圖書出版商機械工業出版社華章公司發起鑒書活動。
簡單說就是:你可以免費讀新書,你可以免費讀新書的同時,順手碼一篇讀書筆記就行。詳情請在大數據公眾號后臺對話框回復合伙人查看。
京東圖書大促開始啦!
每滿100-50!滿200-100!滿300-150!
長按二維碼進入唯一領券入口,領取大數據粉絲專屬“200減35”疊加滿減券。
是的!你沒看錯!相當于165塊買400塊的書!!!(打了多少折你敲敲旁邊的計算器……)
長按下方二維碼或點擊閱讀原文
發現更多好書
據統計,99%的大咖都完成了這個神操作
▼
更多精彩
在公眾號后臺對話框輸入以下關鍵詞
查看更多優質內容!
PPT?|?報告?|?讀書?|?書單?|?干貨?
大數據?|?揭秘?|?Python?|?可視化
人工智能?|?機器學習?|?深度學習?|?神經網絡
AI?|?1024?|?段子?|?區塊鏈?|?數學
猜你想看
如果你不想長期996,看看這個
數據采集技術揭秘:手把手教你全埋點技術解決方案
那些“反人類”的用戶體驗,都錯在哪了?
一文看懂數據挖掘:哪一種方法最好?都需要哪些技術?
Q: 你常用哪些數據庫?
歡迎留言并贏取福利
覺得不錯,請把這篇文章分享給你的朋友
轉載 / 投稿請聯系:baiyu@hzbook.com
更多精彩,請在后臺點擊“歷史文章”查看
點擊閱讀原文,了解更多
總結
以上是生活随笔為你收集整理的以5个数据库为例,用Python实现数据的提取、转换和加载(ETL)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是架构?网络架构中都有什么?终于有人
- 下一篇: OpenCV入门及应用案例:手把手教你做