异步IO\数据库\队列\缓存
Select\Poll\Epoll異步IO
select 多并發socket 例子
1 import select 2 import socket 3 import sys 4 import queue 5 6 7 server = socket.socket() 8 server.setblocking(0) 9 10 server_addr = ('localhost',10000) 11 12 print('starting up on %s port %s' % server_addr) 13 server.bind(server_addr) 14 15 server.listen(5) 16 17 18 inputs = [server, ] #自己也要監測呀,因為server本身也是個fd 19 outputs = [] 20 21 message_queues = {} 22 23 while True: 24 print("waiting for next event...") 25 26 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果沒有任何fd就緒,那程序就會一直阻塞在這里 27 28 for s in readable: #每個s就是一個socket 29 30 if s is server: #別忘記,上面我們server自己也當做一個fd放在了inputs列表里,傳給了select,如果這個s是server,代表server這個fd就緒了, 31 #就是有活動了, 什么情況下它才有活動? 當然 是有新連接進來的時候 呀 32 #新連接進來了,接受這個連接 33 conn, client_addr = s.accept() 34 print("new connection from",client_addr) 35 conn.setblocking(0) 36 inputs.append(conn) #為了不阻塞整個程序,我們不會立刻在這里開始接收客戶端發來的數據, 把它放到inputs里, 下一次loop時,這個新連接 37 #就會被交給select去監聽,如果這個連接的客戶端發來了數據 ,那這個連接的fd在server端就會變成就續的,select就會把這個連接返回,返回到 38 #readable 列表里,然后你就可以loop readable列表,取出這個連接,開始接收數據了, 下面就是這么干 的 39 40 message_queues[conn] = queue.Queue() #接收到客戶端的數據后,不立刻返回 ,暫存在隊列里,以后發送 41 42 else: #s不是server的話,那就只能是一個 與客戶端建立的連接的fd了 43 #客戶端的數據過來了,在這接收 44 data = s.recv(1024) 45 if data: 46 print("收到來自[%s]的數據:" % s.getpeername()[0], data) 47 message_queues[s].put(data) #收到的數據先放到queue里,一會返回給客戶端 48 if s not in outputs: 49 outputs.append(s) #為了不影響處理與其它客戶端的連接 , 這里不立刻返回數據給客戶端 50 51 52 else:#如果收不到data代表什么呢? 代表客戶端斷開了呀 53 print("客戶端斷開了",s) 54 55 if s in outputs: 56 outputs.remove(s) #清理已斷開的連接 57 58 inputs.remove(s) #清理已斷開的連接 59 60 del message_queues[s] ##清理已斷開的連接 61 62 63 for s in writeable: 64 try : 65 next_msg = message_queues[s].get_nowait() 66 67 except queue.Empty: 68 print("client [%s]" %s.getpeername()[0], "queue is empty..") 69 outputs.remove(s) 70 71 else: 72 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 73 s.send(next_msg.upper()) 74 75 76 for s in exeptional: 77 print("handling exception for ",s.getpeername()) 78 inputs.remove(s) 79 if s in outputs: 80 outputs.remove(s) 81 s.close() 82 83 del message_queues[s] server 1 import socket 2 import sys 3 4 messages = [ b'This is the message. ', 5 b'It will be sent ', 6 b'in parts.', 7 ] 8 server_address = ('localhost', 10000) 9 10 # Create a TCP/IP socket 11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 12 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 13 ] 14 15 # Connect the socket to the port where the server is listening 16 print('connecting to %s port %s' % server_address) 17 for s in socks: 18 s.connect(server_address) 19 20 for message in messages: 21 22 # Send messages on both sockets 23 for s in socks: 24 print('%s: sending "%s"' % (s.getsockname(), message) ) 25 s.send(message) 26 27 # Read responses on both sockets 28 for s in socks: 29 data = s.recv(1024) 30 print( '%s: received "%s"' % (s.getsockname(), data) ) 31 if not data: 32 print(sys.stderr, 'closing socket', s.getsockname() ) clientselectors模塊
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print('accepted', conn, 'from', addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 data = conn.recv(1000) # Should be ready 14 if data: 15 print('echoing', repr(data), 'to', conn) 16 conn.send(data) # Hope it won't block 17 else: 18 print('closing', conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind(('localhost', 10000)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask) selectors堡壘機前戲
開發堡壘機之前,先來學習Python的paramiko模塊,該模塊機遇SSH用于連接遠程服務器并執行相關操作
SSHClient
用于連接遠程服務器并執行基本命令
基于用戶名密碼連接:
1 import paramiko 2 3 # 創建SSH對象 4 ssh = paramiko.SSHClient() 5 # 允許連接不在know_hosts文件中的主機 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 連接服務器 8 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123') 9 10 # 執行命令 11 stdin, stdout, stderr = ssh.exec_command('df') 12 # 獲取命令結果 13 result = stdout.read() 14 15 # 關閉連接 16 ssh.close() 1 import paramiko 2 3 transport = paramiko.Transport(('hostname', 22)) 4 transport.connect(username='wupeiqi', password='123') 5 6 ssh = paramiko.SSHClient() 7 ssh._transport = transport 8 9 stdin, stdout, stderr = ssh.exec_command('df') 10 print stdout.read() 11 12 transport.close()基于公鑰密鑰連接:
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') 4 5 # 創建SSH對象 6 ssh = paramiko.SSHClient() 7 # 允許連接不在know_hosts文件中的主機 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 9 # 連接服務器 10 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key) 11 12 # 執行命令 13 stdin, stdout, stderr = ssh.exec_command('df') 14 # 獲取命令結果 15 result = stdout.read() 16 17 # 關閉連接 18 ssh.close() 1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') 4 5 transport = paramiko.Transport(('hostname', 22)) 6 transport.connect(username='wupeiqi', pkey=private_key) 7 8 ssh = paramiko.SSHClient() 9 ssh._transport = transport 10 11 stdin, stdout, stderr = ssh.exec_command('df') 12 13 transport.close() SSHClient 封裝 Transport 1 import paramiko 2 from io import StringIO 3 4 key_str = """-----BEGIN RSA PRIVATE KEY----- 5 MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8 6 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans 7 H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e 8 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC 9 tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP 10 c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A 11 ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+ 12 c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh 13 IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8 14 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz 15 zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6 16 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC 17 OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl 18 HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq 19 UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65 20 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA 21 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM 22 WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH 23 C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8 24 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg 25 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/ 26 pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj 27 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw 28 DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI 29 +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0= 30 -----END RSA PRIVATE KEY-----""" 31 32 private_key = paramiko.RSAKey(file_obj=StringIO(key_str)) 33 transport = paramiko.Transport(('10.0.1.40', 22)) 34 transport.connect(username='wupeiqi', pkey=private_key) 35 36 ssh = paramiko.SSHClient() 37 ssh._transport = transport 38 39 stdin, stdout, stderr = ssh.exec_command('df') 40 result = stdout.read() 41 42 transport.close() 43 44 print(result) 基于私鑰字符串進行連接SFTPClient
用于連接遠程服務器并執行上傳下載
基于用戶名密碼上傳下載
1 import paramiko 2 3 transport = paramiko.Transport(('hostname',22)) 4 transport.connect(username='wupeiqi',password='123') 5 6 sftp = paramiko.SFTPClient.from_transport(transport) 7 # 將location.py 上傳至服務器 /tmp/test.py 8 sftp.put('/tmp/location.py', '/tmp/test.py') 9 # 將remove_path 下載到本地 local_path 10 sftp.get('remove_path', 'local_path') 11 12 transport.close()基于公鑰密鑰上傳下載
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') 4 5 transport = paramiko.Transport(('hostname', 22)) 6 transport.connect(username='wupeiqi', pkey=private_key ) 7 8 sftp = paramiko.SFTPClient.from_transport(transport) 9 # 將location.py 上傳至服務器 /tmp/test.py 10 sftp.put('/tmp/location.py', '/tmp/test.py') 11 # 將remove_path 下載到本地 local_path 12 sftp.get('remove_path', 'local_path') 13 14 transport.close()RabbitMQ隊列
安裝 rabbitMA
http://www.cnblogs.com/ericli-ericli/p/5902270.html
http://blog.csdn.net/zyz511919766/article/details/41946521
Work Queues
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 host='localhost')) 4 channel = connection.channel() 5 6 channel.queue_declare(queue='hello') 7 8 channel.basic_publish(exchange='', 9 routing_key='hello', 10 body='Hello World!') 11 print(" [x] Sent 'Hello World!'") 12 connection.close() 生產者 1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 host='localhost')) 4 channel = connection.channel() 5 6 channel.queue_declare(queue='hello') 7 8 def callback(ch, method, properties, body): 9 print(" [x] Received %r" % body) 10 11 channel.basic_consume(callback, 12 queue='hello', 13 no_ack=True) 14 15 print(' [*] Waiting for messages. To exit press CTRL+C') 16 channel.start_consuming() 消費者1、acknowledgment 消息不丟失
no-ack = False,如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host='10.211.55.4')) 5 channel = connection.channel() 6 7 channel.queue_declare(queue='hello') 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 import time 12 time.sleep(10) 13 print 'ok' 14 ch.basic_ack(delivery_tag = method.delivery_tag) 15 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack=False) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming() 消費者2、durable ? 消息不丟失
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel()# make message persistent channel.queue_declare(queue='hello', durable=True)channel.basic_publish(exchange='',routing_key='hello',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close() 生產者 1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello', durable=True) 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print 'ok' 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_consume(callback, 18 queue='hello', 19 no_ack=False) 20 21 print(' [*] Waiting for messages. To exit press CTRL+C') 22 channel.start_consuming() 消費者3、消息獲取順序
默認消息隊列里的數據是按照順序被消費者拿走,但是在消費者端,配置prefetch_count=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel()# make message persistent channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(" [x] Received %r" % body)import timetime.sleep(10)print 'ok'ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(callback,queue='hello',no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() 消費者發布訂閱
發布訂閱和簡單的消息隊列區別在于,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
?exchange type = fanout
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='logs', 9 type='fanout') 10 11 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 12 channel.basic_publish(exchange='logs', 13 routing_key='', 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close() 發布者 import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')result = channel.queue_declare(exclusive=True) queue_name = result.method.queuechannel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() 訂閱者有選擇的接收消息
?
?exchange type = direct
之前事例,發送消息時明確指定某個隊列并向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 17 sys.exit(1) 18 19 for severity in severities: 20 channel.queue_bind(exchange='direct_logs', 21 queue=queue_name, 22 routing_key=severity) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming() 消費者 1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 12 message = ' '.join(sys.argv[2:]) or 'Hello World!' 13 channel.basic_publish(exchange='direct_logs', 14 routing_key=severity, 15 body=message) 16 print(" [x] Sent %r:%r" % (severity, message)) 17 connection.close() 生產者模糊匹配
?
exchange type = topic
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
- # 表示可以匹配 0 個 或 多個 單詞
- * ?表示只能匹配 一個 單詞
- 1 發送者路由值 隊列中
2 old.boy.python old.* -- 不匹配
3 old.boy.python old.# -- 匹配 1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='topic_logs',
9 type='topic')
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17 sys.exit(1)
18
19 for binding_key in binding_keys:
20 channel.queue_bind(exchange='topic_logs',
21 queue=queue_name,
22 routing_key=binding_key)
23
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25
26 def callback(ch, method, properties, body):
27 print(" [x] %r:%r" % (method.routing_key, body))
28
29 channel.basic_consume(callback,
30 queue=queue_name,
31 no_ack=True)
32
33 channel.start_consuming() 消費者 import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close() 生產者
memcached?
http://www.cnblogs.com/wupeiqi/articles/5132791.html?
?
redis 使用
http://www.cnblogs.com/alex3714/articles/6217453.html
?
轉載于:https://www.cnblogs.com/nikitapp/p/6722783.html
總結
以上是生活随笔為你收集整理的异步IO\数据库\队列\缓存的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Codeforces第一次rated比赛
- 下一篇: 每日求一录~20170704