python服务端编程_Python WebSocket服务端编程代码完成gtalk机器人
本文python源碼為實現,Python WebSocket服務端編程代碼完成gtalk機器人的全部代碼段。需要用到python sys、sleekxmpp、reactor等python模塊及方法,在代碼中導入部分可以查看到詳細方法。
python websocket庫,運作機制及客戶端和服務端的 API 實現。Python WebSocket的具體操作方法和工作原理也可以查看相關文章來學習。
import sys
from twisted.internet import reactor
from twisted.python import log
from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol
import sleekxmpp
reload(sys)
sys.setdefaultencoding('utf8')
class gtalkBot(sleekxmpp.ClientXMPP):
"""
A basic SleekXMPP bot that will log in, send a message,
and then log out.
"""
def __init__(self, jid, password):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.recipient = 'wxg4net@gmail.com'
self.status_text = ''
self.add_event_handler("session_start", self.start)
self.add_event_handler("message", self.muc_message)
self.add_event_handler('presence_available',
self.handle_presence_available)
self.add_event_handler('presence_unavailable',
self.handle_presence_unavailable)
self.add_event_handler('presence_dnd',
self.handle_presence_dnd)
def start(self, event):
self.send_presence()
self.get_roster()
def handle_presence_unavailable(self, presence):
if cmp(presence['from'].bare, self.recipient) == 0:
self.status_text = presence['status']
def handle_presence_available(self, presence):
if cmp(presence['from'].bare, self.recipient) == 0:
self.status_text = presence['status']
def handle_presence_dnd(self, presence):
if cmp(presence['from'].bare, self.recipient) == 0:
self.status_text = presence['status']
def muc_message(self, msg):
fwho = msg['from'].bare
body = msg['body']
if cmp(body,'-qsbk') == 0:
self.send_message(mto=fwho,
mbody='當前服務已關閉',
mtype='chat')
elif cmp(body,'-help') == 0:
self.send_message(mto=fwho,
mbody='當前服務開發中...',
mtype='chat')
elif cmp(fwho, self.recipient) == 0:
factory.broadcast(str(body))
else:
self.send_message(mto=fwho,
mbody='當前服務開發中...',
mtype='chat')
def sendMessageTome(self, message):
self.send_message(mto="wxg4net@gmail.com",
mbody=message,
mtype='chat')
#python websocket
class BroadcastServerProtocol(WebSocketServerProtocol):
def __init__(self):
self.istip = True
self.Heartbeat()
def Heartbeat(self):
reactor.callLater(1, self.Heartbeat)
def onOpen(self):
self.factory.register(self)
def onMessage(self, msg, binary):
xmpp.sendMessageTome(msg)
if self.istip:
self.sendMessage('你好,websocket python的當前狀態是:'+str(xmpp.status_text)+'。---祝你生活愉快!')
self.istip = False
def connectionLost(self, reason):
WebSocketServerProtocol.connectionLost(self, reason)
self.factory.unregister(self)
class BroadcastServerFactory(WebSocketServerFactory):
protocol = BroadcastServerProtocol
def __init__(self):
WebSocketServerFactory.__init__(self)
self.clients = []
def register(self, client):
if not client in self.clients:
self.clients.append(client)
def unregister(self, client):
if client in self.clients:
self.clients.remove(client)
def broadcast(self, message):
if self.clients:
for c in self.clients:
c.sendMessage(message)
#www.iplaypy.com
if __name__ == '__main__':
1a58
log.startLogging(sys.stdout)
xmpp = gtalkBot("phzggzs@gmail.com", "*******")
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0199') # XMPP Ping
if xmpp.connect():
xmpp.process(threaded=True)
factory = BroadcastServerFactory()
reactor.listenTCP(9090, factory)
reactor.run()
玩蛇網文章,轉載請注明出處和文章網址:https://www.iplaypy.com/code/c2700.html
相關文章 Recommend
總結
以上是生活随笔為你收集整理的python服务端编程_Python WebSocket服务端编程代码完成gtalk机器人的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何使用Docker搭建Heimdall
- 下一篇: Jmeter教程(18)--GT性能监控