python分布式定时任务_分布式定时任务框架——python定时任务框架APScheduler扩展...
如果將定時(shí)任務(wù)部署在一臺(tái)服務(wù)器上,那么這個(gè)定時(shí)任務(wù)就是整個(gè)系統(tǒng)的單點(diǎn),這臺(tái)服務(wù)器出現(xiàn)故障的話會(huì)影響服務(wù)。對(duì)于可以冗余的任務(wù)(重復(fù)運(yùn)行不影響服務(wù)),可以部署在多臺(tái)服務(wù)器上,讓他們同時(shí)執(zhí)行,這樣就可以很簡(jiǎn)單的避免單點(diǎn)。但是如果任務(wù)不允許冗余,最多只能有一臺(tái)服務(wù)器執(zhí)行任務(wù),那么前面的方法顯然行不通。本篇文章就向大家介紹如何避免這種互斥任務(wù)的單點(diǎn)問(wèn)題,最后再介紹一下基于APScheduler的分布式定時(shí)任務(wù)框架,這個(gè)框架是通過(guò)多個(gè)項(xiàng)目的實(shí)踐總結(jié)而成的。
對(duì)于運(yùn)行在同一臺(tái)服務(wù)器上的兩個(gè)進(jìn)程,可以通過(guò)加鎖實(shí)現(xiàn)互斥執(zhí)行,而對(duì)于運(yùn)行在多個(gè)服務(wù)器上的任務(wù)仍然可以通過(guò)用加鎖實(shí)現(xiàn)互斥,不過(guò)這個(gè)鎖是分布式鎖。這個(gè)分布式鎖并沒(méi)有那么神秘,實(shí)際上只要一個(gè)提供原子性的數(shù)據(jù)庫(kù)即可。比如,在數(shù)據(jù)庫(kù)的locks表里有一個(gè)記錄(lock record),包含屬性:
name:鎖的名字,互斥的任務(wù)需要用名字相同的鎖。
active_ip:持有鎖的服務(wù)器的ip。
update_time:上次持有鎖的時(shí)間,其他非活躍的服務(wù)器通過(guò)這個(gè)屬性判斷活躍的服務(wù)器是否超時(shí),如果超時(shí),則會(huì)爭(zhēng)奪鎖。
一個(gè)持有鎖的服務(wù)器通過(guò)不斷的發(fā)送心跳,來(lái)更新這個(gè)記錄,心跳的內(nèi)容就是持有鎖的時(shí)間戳(update_time),以及本機(jī)ip。也就是說(shuō),通過(guò)發(fā)送心跳來(lái)保證當(dāng)前的服務(wù)器是活躍的,而其他服務(wù)器通過(guò)lock record中的update_time來(lái)判斷當(dāng)前活躍的服務(wù)器是否超時(shí),一旦超時(shí),其他的服務(wù)器就會(huì)去爭(zhēng)奪鎖,接管任務(wù)的執(zhí)行,并發(fā)送心跳更新active_ip。
通過(guò)上面描述,這個(gè)框架中最重要的兩個(gè)概念就是分布式鎖和心跳。下面看一下分布式定時(shí)任務(wù)框架中是如何實(shí)現(xiàn)這兩點(diǎn)的。當(dāng)然,這個(gè)框架依賴(lài)于APScheduler,所以必須安裝這個(gè)模塊,具體APScheduler的介紹見(jiàn)我的另一篇文章,因?yàn)橐蕾?lài)APScheduler,所以這個(gè)框架很簡(jiǎn)單,只有一個(gè)類(lèi):
from apscheduler.scheduler import Scheduler
import datetime
import time
import socket
import struct
import fcntl
def get_ip(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])
class MutexScheduler(Scheduler):
def __init__(self, gconfig={}, **options):
Scheduler.__init__(self, gconfig, **options)
self.ip = get_ip('eth0')
def mutex(self, lock = None, heartbeat = None, lock_else = None,
unactive_interval = datetime.timedelta(seconds = 30)):
def mutex_func_gen(func):
def mtx_func():
if lock:
lock_rec = lock()
now = datetime.datetime.now()
# execute mutex job when the server is active, or the other server is timeout.
if not lock_rec or lock_rec['active_ip'] == self.ip or (lock_rec['update_time'] and now - lock_rec['update_time'] >= unactive_interval):
if lock_rec:
del lock_rec['active_ip']
del lock_rec['update_time']
if not lock_rec:
lock_rec = {}
lock_attrs = func(**lock_rec)
if not lock_attrs:
lock_attrs = {}
# send heart beat
heartbeat(self.ip, now, **lock_attrs)
else:
lock_else(lock_rec)
else:
func()
return mtx_func
self.mtx_func_gen = mutex_func_gen
def inner(func):
return func
return inner
def cron_schedule(self, **options):
def inner(func):
if hasattr(self, 'mtx_func_gen'):
func = self.mtx_func_gen(func)
func.job = self.add_cron_job(func, **options)
return func
return inner
mutex方法是核心,通過(guò)裝飾器的方式提供互斥功能。在使用時(shí):
@sched.mutex(lock = my_lock, heartbeat = my_heartbeat)
@sched.cron_schedule(second = '*')
def my_job(**attrs):
print 'my_job ticks'
mutex裝飾器必須用在cron_schedule裝飾器之前,mutex主要是組裝job。mutex的參數(shù)有:
lock:函數(shù),用于獲取鎖記錄(lock record),函數(shù)原型:lock()。lock的返回值時(shí)dict,就是鎖記錄內(nèi)容。
heartbeat:函數(shù),用于發(fā)出心跳,函數(shù)原型:heartbeat(ip, now, **attrs)。ip是本機(jī)ip;now是當(dāng)前時(shí)間戳;attrs是一個(gè)dict,用于在鎖記錄中存放一些其他用戶(hù)自定義信息。
lock_else:函數(shù),在沒(méi)有獲得鎖時(shí)執(zhí)行,函數(shù)原型:lock_else(lock_rec)。lock_rec是鎖記錄,包含active_ip,update_time以及用戶(hù)自定義的屬性。
unactive_interval:datetime.timedelta類(lèi)型,超時(shí)時(shí)間,也就是說(shuō)當(dāng)前時(shí)間減去update_time大于unactive_interval的話,就代表超時(shí)。
在使用這個(gè)類(lèi)時(shí),必須實(shí)現(xiàn)自己的lock,heartbeat以及l(fā)ock_else函數(shù)。
job的原型是job(**attrs),attrs就是存放在鎖記錄中的用戶(hù)自定義屬性,job可以有dict類(lèi)型的返回值,這個(gè)返回值會(huì)存入鎖記錄中。
下面,看一下具體使用的例子,使用的mongodb存放分布式鎖。
import apscheduler.events
import datetime
import time
import pymongo
import sys
sys.path.append('../src/')
import mtxscheduler
sched = mtxscheduler.MutexScheduler()
mongo = pymongo.Connection(host = '127.0.0.1', port = 27017)
lock_store = mongo['lockstore']['locks']
def lock():
print 'lock()'
now = datetime.datetime.now() - datetime.timedelta(seconds = 3)
lck = lock_store.find_one({'name': 't'})
return lck
def hb(ip, now, **attrs):
print 'heartbeat()'
attrs['active_ip'] = ip
attrs['update_time'] = now
lock_store.update({'name': 't'}, {'$set': attrs}, upsert = True)
def le(lock_rec):
if lock_rec:
print 'active ip', lock_rec['active_ip']
else:
print 'lock else'
i = 0
@sched.mutex(lock = lock, heartbeat = hb, lock_else = le)
@sched.cron_schedule(second = '*')
def job(**attr):
global i
i += 1
print i
def err_listener(ev):
if ev.exception:
print sys.exc_info()
sched.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR)
sched.start()
time.sleep(10)
這里用到了mongodb的python driver,可以通過(guò)命令安裝:
easy_install pymongo
easy_install的安裝件另一篇文章。
這個(gè)任務(wù)很簡(jiǎn)單就是定時(shí)打印整數(shù)序列。同時(shí)在兩臺(tái)服務(wù)器上部署運(yùn)行,可以發(fā)現(xiàn)只有一臺(tái)服務(wù)器會(huì)輸出整數(shù)序列。
使用起來(lái)還是很方便的。源代碼見(jiàn)github,其中還有使用redis存儲(chǔ)鎖,已經(jīng)在鎖記錄中存放自定義信息的例子。
總結(jié)
以上是生活随笔為你收集整理的python分布式定时任务_分布式定时任务框架——python定时任务框架APScheduler扩展...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python进阶到高阶大全(强烈推荐)
- 下一篇: 一个大神的Android成长之路