linux内核多队列,Linux Kernel 中 Workqueue 使用系统默认队列和创建队列的方法
關于workqueue,我們還是有很多話要說。
想必大家對workqueue相關的函數(schedule_work 、queue_work、INIT_WORK、create_singlethread_workqueue 等)都不陌生。但說起差異,可能還有許多話需要坐下來慢慢講。
對于workqueue,與之最為相關的兩個東西便是work和queue。
work是用來綁定實際執行函數使用的結構體
queue是用來鏈接work使用的隊列。
具體的結構體,可以自己到/kernel/include/linux/workqueue.h中自行查看,這里不再贅述。
我們想關注的重點在于:
1:系統中是否有default的workqueue供我們使用
2:我們能否創建自己的workqueue?如何創建?
Yes!你說對了。linux系統所提供的workqueue機制中,已經幫忙提供了一個默認的workqueue隊列“system_wq”,并提供了一套函數來方便大家直接使用。
例子來了:
static struct work_struct?work;
INIT_WORK(&work,?run);
schedule_work(&work);
static void?run(struct work_struct *work)
{
Do something here!!
}
就這么簡單的,當然,你也可以用DECLARE_WORK來完成和INIT_WORK同樣初始化work的工作。區別是DECLARE_WORK是預編譯命令,而INIT_WORK可以在code中動態初始化。
那么除了調用schedule_work直接把work放到系統default的workqueue中外,我們還有什么辦法可以初始化自己的workqueue,并且放入work呢?
我們看看函數schedule_work的定義,一切就真相大白了!
static inline bool?schedule_work(struct work_struct *work)
{
return?queue_work(system_wq, work);
}
哈哈,原來schedule_work是把傳入的work直接放入了系統的default workqueue “system_wq”中而已。
自然,我們只需要調用queue_work函數來綁定workqueue和work就ok啦!
初始化work的方法和前面一樣,只要調用DECLARE_WORK或INIT_WORK就好了。
那么我們如何去創建自己的workqueue呢?
答案是:
#define?alloc_ordered_workqueue(fmt, flags, args…)?????????????? \
alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define?create_workqueue(name)????????????????????????????? \
alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define?create_freezable_workqueue(name)??????????????????? \
alloc_workqueue((name), WQ_FREEZABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define?create_singlethread_workqueue(name)??????????????????? \
alloc_workqueue((name), WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
我們只要調用如上幾種方法的某一種,即可創建屬于自己的workqueue了。kernel中最常見到的函數是create_singlethread_workqueue。
我們只要拿這個函數舉個例子就美好了,栗子來了!
static struct workqueue_struct *time_sync_wq;
time_sync_wq?=?create_singlethread_workqueue(“timesync“); //timesync就是workqueue的名字
static?DECLARE_WORK(etr_work,?etr_work_fn);
queue_work(time_sync_wq, &etr_work);
這樣一來,我們的work和自己的workqueue就綁在一起了。
個人認為,自己新建wq最大的好處:可以避免system_wq中被掛的work過多,或者由于某個被掛上去的work處理函數質量不高導致死鎖,而導致掛在同一個queue上的我們自己work的handler因為無法被調度到而完蛋了。
當然,建太多自己的workqueue,必然會導致系統調度開銷的變大,所以需要取舍。
至于DELAYED_WORK
#define?INIT_DELAYED_WORK(_work, _func)???????????????????????? \
__INIT_DELAYED_WORK(_work, _func, 0)
queue_delayed_work
schedule_delayed_work
都和前面類似,就不再贅述了。
補充下:
咱們可以調用cancel_delayed_work來把還未執行的work給取消掉。
基本上每次?cancel_delayed_work?之后您都得調用flush_scheduled_work() 這個函數 , 特別是對于內核模塊 , 如果一個模塊使用了工作隊列機制 , 并且利用了系統的default隊列 , 那么在卸載這個模塊之前 , 您必須得調用這個函數 , 這叫做刷新一個工作隊列 , 也就是說 , 該函數會一直等待 , 直到隊列中所有對象都被執行以后才返回 ,從而避免隊列調度錯誤。
函數cancel_delayed_work_sync的出現,讓新的流程變得更加簡單了,大家參照kernel中的代碼,很容易知道應該怎么用。
最后別忘了調用destroy_workqueue等清尾的函數哦~~
問題來了:
如果我們在handler的執行過程中,同時再次調用調度函數queue_work,那么我們的handler會被執行多少次呢?(執行被調度的次數,還是就只執行一次呢?)
解答:
這個問題比較有意思,寫了這個例子來驗證答案(例子跑在android 4.4 code base中,不排除后續kernel函數被修改)
sample test code:
static struct workqueue_struct *test_wq;
static void try_to_test(struct work_struct *work){printk(“[bevis] :wq into \n”);
msleep(5*1000);? //5s
printk(“[bevis] :wq out \n”);
}
static DECLARE_WORK(mytest_work, try_to_test);
gsensor probe function end add :
test_wq =alloc_ordered_workqueue(“test_wq”, 0);//初始化一個單獨的工作隊列
int a = 0;
for(a=0 ; a<3 ; a++){
printk(“[bevis] : read func (%d) before \n”,a);
queue_work(test_wq, &mytest_work);//讓這個隊列開始被調度
printk(“[bevis] : read func (%d) msleep 2s \n”,a);
msleep(2*1000);
printk(“[bevis] : read func (%d) after \n”,a);
}
log如下:
10-16 14:10:41.940 I/KERNEL? (? 109): [??? 6.954658] [bevis] : read func (0) before
10-16 14:10:41.940 I/KERNEL? (? 109): [??? 6.954727] [bevis] : read func (0) msleep 2s
10-16 14:10:41.940 I/KERNEL? (? 109): [??? 6.954742] [bevis] :wq into
10-16 14:10:43.950 I/KERNEL? (? 109): [??? 8.960997] [bevis] : read func (0) after
10-16 14:10:43.950 I/KERNEL? (? 109): [??? 8.961085] [bevis] : read func (1) before
10-16 14:10:43.950 I/KERNEL? (? 109): [??? 8.961155] [bevis] : read func (1) msleep 2s
10-16 14:10:45.960 I/KERNEL? (? 109): [?? 10.971954] [bevis] : read func (1) after
10-16 14:10:45.960 I/KERNEL? (? 109): [?? 10.972076] [bevis] : read func (2) before
10-16 14:10:45.960 I/KERNEL? (? 109): [?? 10.972132] [bevis] : read func (2) msleep 2s
10-16 14:10:46.950 I/KERNEL? (??? 6): ?[?? 11.961884] [bevis] :wq out
10-16 14:10:46.950 I/KERNEL? (??? 6): ?[?? 11.961953] [bevis] :wq into
10-16 14:10:47.970 I/KERNEL? (? 109): [?? 12.982276] [bevis] : read func (2) after
10-16 14:10:51.960 I/KERNEL? (??? 6): ?[?? 16.973719] [bevis] :wq out
看到了吧,雖然我們使用queue_work函數調度了三次handler,但實際上wq的handler只被執行了兩次。
如果把probe函數的delay直接拿掉,你更加會發現,即使wq被調度三次,handler卻實際上只跑了一次。
結論:
如果wq被調度的時候,wq中的這個handler正在執行過程中,則這次調度會被遺棄。只有handler執行完成并返回后,下次調度才會真正的生效。
kernel這么做的原因,我猜想應該是為了防止,檔某個wq的handler在執行過程中因為資源無法獲取而暫時阻塞時,
不會因為其他進程再次調度了該wq而導致出現線程實例的不斷累加。
實際上,在絕大多數情況下,我們只需要一個handler實例來幫忙做事就夠了,例如earlysuspend的處理函數中,只要userspace進行想睡眠,那就直接調度suspend wq的handler,而不必管再關心上次的suspend過程是否有阻塞。
這樣一來,邏輯就清爽多了。
如需轉載,請注明出處。
總結
以上是生活随笔為你收集整理的linux内核多队列,Linux Kernel 中 Workqueue 使用系统默认队列和创建队列的方法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 操作系统:Linux环境变量相关知识总结
- 下一篇: 服务器:浅谈 Nginx 性能调优,太实