php协程实现mysql异步_swoole与php协程实现异步非阻塞IO开发
“協(xié)程可以在遇到阻塞的時(shí)候中斷主動(dòng)讓渡資源,調(diào)度程序選擇其他的協(xié)程運(yùn)行。從而實(shí)現(xiàn)非阻塞IO”
然而php是不支持原生協(xié)程的,遇到阻塞時(shí)如不交由異步進(jìn)程來執(zhí)行是沒有任何意義的,代碼還是同步執(zhí)行的,如下所示:
function foo()
{
$db=new Db();
$result=(yield $db->query());
yield $result;
}
上面的數(shù)據(jù)庫查詢操作是阻塞的,當(dāng)調(diào)度器調(diào)度該協(xié)程到這一步時(shí)發(fā)現(xiàn)執(zhí)行了阻塞操作,此時(shí)調(diào)度器該怎么辦?選擇其余協(xié)程執(zhí)行?那該協(xié)程的阻塞操作又該何時(shí)執(zhí)行,交由誰執(zhí)行呢?所以說在php協(xié)程中拋開異步調(diào)用談非阻塞IO屬于耍流氓。
而swoole的異步task提供了一個(gè)實(shí)現(xiàn)異步的解決方案,關(guān)于swoole_task可以參考官方文檔
核心功能實(shí)現(xiàn)
將一次請(qǐng)求形成一個(gè)協(xié)程
首先創(chuàng)建一個(gè)swoole_server并設(shè)置回調(diào)
class HttpServer implements Server
{
private $swooleHttpServer;
public function __construct(\swoole_http_server $swooleHttpServer)
{
$this->swooleHttpServer = $swooleHttpServer;
}
public function start()
{
$this->swooleHttpServer->on('start', [$this, 'onStart']);
$this->swooleHttpServer->on('shutdown', [$this, 'onShutdown']);
$this->swooleHttpServer->on('workerStart', [$this, 'onWorkerStart']);
$this->swooleHttpServer->on('workerStop', [$this, 'onWorkerStop']);
$this->swooleHttpServer->on('workerError', [$this, 'onWorkerError']);
$this->swooleHttpServer->on('task', [$this, 'onTask']);
$this->swooleHttpServer->on('finish', [$this, 'onFinish']);
$this->swooleHttpServer->on('request', [$this, 'onRequest']);
$this->swooleHttpServer->start();
}
onRequest方法:
public function onRequest(\swoole_http_request $request, \swoole_http_response $response)
{
$requestHandler = new RequestHandler($request, $response);
$requestHandler->handle();
}
在ReqeustHandler中執(zhí)行handle方法,來解析請(qǐng)求的路由,并創(chuàng)建控制器,調(diào)用相應(yīng)的方法,相
public function handle()
{
$this->context = new Context($this->request, $this->response, $this->getFd());
$this->router = new Router($this->request);
try {
if (false === $this->router->parse()) {
$this->response->output('');
return;
}
$coroutine = $this->doRun();
$task = new Task($coroutine, $this->context);
$task->run();
} catch (\Exception $e) {
PcsExceptionHandler::handle($e, $this->response);
}
}
private function doRun()
{
$ret = (yield $this->dispatch());
yield $this->response->send($ret);
}
上面代碼中的ret是action()的調(diào)用結(jié)果,yield $this->response->send($ret);是向?qū)蛻舳苏?qǐng)求的應(yīng)答。
$coroutine是這一次請(qǐng)求形成的一個(gè)協(xié)程(Genetator對(duì)象),包含了整個(gè)請(qǐng)求的流程,接下來就要對(duì)這個(gè)協(xié)程進(jìn)行調(diào)度來獲取真正的執(zhí)行結(jié)果。
協(xié)程調(diào)度
namespace Pcs\Coroutine;
use Pcs\Network\Context\Context;
class Task
{
private $coroutine;
private $context;
private $status;
private $scheduler;
private $sendValue;
public function __construct(\Generator $coroutine, Context $context)
{
$this->coroutine = $coroutine;
$this->context = $context;
$this->scheduler = new Scheduler($this);
}
public function run()
{
while (true) {
try {
$this->status = $this->scheduler->schedule();
switch ($this->status) {
case TaskStatus::TASK_WAIT:
echo "task status: TASK_WAIT\n";
return null;
case TaskStatus::TASK_DONE:
echo "task status: TASK_DONE\n";
return null;
case TaskStatus::TASK_CONTINUE;
echo "task status: TASK_CONTINUE\n";
break;
}
} catch (\Exception $e) {
$this->scheduler->throwException($e);
}
}
}
public function setCoroutine($coroutine)
{
$this->coroutine = $coroutine;
}
public function getCoroutine()
{
return $this->coroutine;
}
public function valid()
{
if ($this->coroutine->valid()) {
return true;
} else {
return false;
}
}
public function send($value)
{
$this->sendValue = $value;
$ret = $this->coroutine->send($value);
return $ret;
}
public function getSendVal()
{
return $this->sendValue;
}
}
Task依賴于Generator對(duì)象$coroutine,在Task類中定義了一些get/set方法,以及一些Generator的方法,Task::run()方法用來執(zhí)行對(duì)協(xié)程的調(diào)度,調(diào)度行為由Schedule來執(zhí)行,每次調(diào)度都會(huì)返回當(dāng)前這次調(diào)度的狀態(tài)。多個(gè)協(xié)程共用一個(gè)調(diào)度器,而這里run方法會(huì)為每個(gè)協(xié)程創(chuàng)建一個(gè)調(diào)度器,原因是每個(gè)協(xié)程都是一個(gè)客戶端的請(qǐng)求,使用一個(gè)單獨(dú)的調(diào)度器能減少相互間的影響,而且多個(gè)協(xié)程之間的調(diào)度順序是swoole來處理的,這里的調(diào)度器不用關(guān)心。下面給出調(diào)度的代碼:
namespace Pcs\Coroutine;
class Scheduler
{
private $task;
private $stack;
const SCHEDULE_CONTINUE = 10;
public function __construct(Task $task)
{
$this->task = $task;
$this->stack = new \SplStack();
}
public function schedule()
{
$coroutine = $this->task->getCoroutine();
$value = $coroutine->current();
$status = $this->handleSystemCall($value);
if ($status !== self::SCHEDULE_CONTINUE) return $status;
$status = $this->handleStackPush($value);
if ($status !== self::SCHEDULE_CONTINUE) return $status;
$status = $this->handleAsyncJob($value);
if ($status !== self::SCHEDULE_CONTINUE) return $status;
$status = $this->handelYieldValue($value);
if ($status !== self::SCHEDULE_CONTINUE) return $status;
$status = $this->handelStackPop();
if ($status !== self::SCHEDULE_CONTINUE) return $status;
return TaskStatus::TASK_DONE;
}
public function isStackEmpty()
{
return $this->stack->isEmpty();
}
private function handleSystemCall($value)
{
if (!$value instanceof SystemCall) {
return self::SCHEDULE_CONTINUE;
}
}
private function handleStackPush($value)
{
if (!$value instanceof \Generator) {
return self::SCHEDULE_CONTINUE;
}
$coroutine = $this->task->getCoroutine();
$this->stack->push($coroutine);
$this->task->setCoroutine($value);
return TaskStatus::TASK_CONTINUE;
}
private function handleAsyncJob($value)
{
if (!is_subclass_of($value, Async::class)) {
return self::SCHEDULE_CONTINUE;
}
$value->execute([$this, 'asyncCallback']);
return TaskStatus::TASK_WAIT;
}
public function asyncCallback($response, $exception = null)
{
if ($exception !== null
&& $exception instanceof \Exception
) {
$this->throwException($exception, true);
} else {
$this->task->send($response);
$this->task->run();
}
}
private function handelYieldValue($value)
{
if (!$this->task->valid()) {
return self::SCHEDULE_CONTINUE;
}
$ret = $this->task->send($value);
return TaskStatus::TASK_CONTINUE;
}
private function handelStackPop()
{
if ($this->isStackEmpty()) {
return self::SCHEDULE_CONTINUE;
}
$coroutine = $this->stack->pop();
$this->task->setCoroutine($coroutine);
$value = $this->task->getSendVal();
$this->task->send($value);
return TaskStatus::TASK_CONTINUE;
}
public function throwException($e, $isFirstCall = false)
{
if ($this->isStackEmpty()) {
$this->task->getCoroutine()->throw($e);
return;
}
try {
if ($isFirstCall) {
$coroutine = $this->task->getCoroutine();
} else {
$coroutine = $this->stack->pop();
}
$this->task->setCoroutine($coroutine);
$coroutine->throw($e);
$this->task->run();
} catch (\Exception $e) {
$this->throwException($e);
}
}
}
Scheduler中的schedule方法會(huì)獲取當(dāng)前Task的協(xié)程,并通過current()方法獲取當(dāng)前中斷點(diǎn)的返回值,接著依次調(diào)用5個(gè)方法來對(duì)返回值進(jìn)行處理。
1:handleSystemCall
如果返回的值是SystemCall類型的對(duì)象,則執(zhí)行系統(tǒng)調(diào)用,如killTask之類的操作,systemCall是第一優(yōu)先級(jí)。
2:handleStackPush
在A函數(shù)中調(diào)用B函數(shù),則B函數(shù)稱為A函數(shù)的子例程(子函數(shù)),然而在協(xié)程中卻不能像普通函數(shù)那樣調(diào)用。
function funcA()
{
return funcB();
}
function genA()
{
yield genB();
}
在funcA中funcB();會(huì)返回funcB的執(zhí)行結(jié)果,但是在genA中,yield genB();會(huì)返回一個(gè)Generator對(duì)象,而不是genB的最終執(zhí)行結(jié)果。想得到genB的執(zhí)行結(jié)果需要對(duì)genB進(jìn)行調(diào)度,而genB中又可能有g(shù)enC()genD()的協(xié)程嵌套,所以為了讓協(xié)程像函數(shù)一眼正常調(diào)用,這里使用協(xié)程棧來實(shí)現(xiàn)。
如上圖,當(dāng)調(diào)度器獲取到GenA(父協(xié)程)的返回值is instance of Generator時(shí),調(diào)度器會(huì)把父協(xié)程push到stack中,然后把子協(xié)程分配給Task,繼續(xù)調(diào)度子協(xié)程。如此反復(fù)直到最后一個(gè)子協(xié)程返回,然后開始pop,將stack中的協(xié)程依次取出
3:handleAsyncJob
handleAsyncJob是整個(gè)協(xié)程調(diào)度的核心
private function handleAsyncJob($value)
{
if (!is_subclass_of($value, Async::class)) {
return self::SCHEDULE_CONTINUE;
}
$value->execute([$this, 'asyncCallback']);
return TaskStatus::TASK_WAIT;
}
public function asyncCallback($response, $exception = null)
{
if ($exception !== null
&& $exception instanceof \Exception
) {
$this->throwException($exception, true);
} else {
$this->task->send($response);
$this->task->run();
}
}
當(dāng)協(xié)程調(diào)度的返回值是繼承了Async的子類或者是實(shí)現(xiàn)了Asycn接口的實(shí)例的時(shí)候,會(huì)執(zhí)行Async的execute方法。這里用mysqli數(shù)據(jù)庫查詢類舉例。
public function execute(callable $callback)
{
$this->callback = $callback;
$serv = ServerHolder::getServer();
$serv->task($this->sql, -1, [$this, 'queryReady']);
}
public function queryReady(\swoole_http_server $serv, $task_id, $data)
{
$queryResult = unserialize($data);
$exception = null;
if ($queryResult->errno != 0) {
$exception = new \Exception($queryResult->error);
}
call_user_func_array($this->callback, [$queryResult, $exception]);
}
execute方法接收一個(gè)函數(shù)作為該異步操作完成之后的回調(diào)函數(shù),在Mysqli類中的execute方法中,啟動(dòng)了一個(gè)異步swoole_task,將sql操作交給swoole_task異步執(zhí)行,在執(zhí)行結(jié)束后會(huì)執(zhí)行queryReady方法,該方法在解析異步返回?cái)?shù)據(jù)之后執(zhí)行$this->callback()也就是之前在調(diào)度器中傳入的 asyncCallback方法,該方法在檢測(cè)異常之后會(huì)執(zhí)行send()方法將異步執(zhí)行的結(jié)果發(fā)送到中斷處,繼續(xù)執(zhí)行。
handleAsyncJob不會(huì)等待異步操作的返回結(jié)果,而是直接返回TASK_WAIT信號(hào),回到上面的Task->run()方法可以看到TASK_WAIT信號(hào)會(huì)導(dǎo)致run()方法返回null,釋放當(dāng)前worker,調(diào)度流程圖如下圖所示,
4:handleYieldValue
private function handelYieldValue($value)
{
if (!$this->task->valid()) {
return self::SCHEDULE_CONTINUE;
}
$ret = $this->task->send($value);
return TaskStatus::TASK_CONTINUE;
}
如果某次yield的返回值既不是異步調(diào)用也不是Generator,那么判斷當(dāng)前的generator是否是valid(是否執(zhí)行完)如果執(zhí)行完畢,繼續(xù)調(diào)度,執(zhí)行下面的handleStackPush方法,否則的話返回Task_Continue繼續(xù)調(diào)度,也就是說在一個(gè)generator中多次yield,最后只會(huì)取最后一次yield的返回值。
5:handleStackPush
當(dāng)上一步中判斷!$this->task->valid()也就是當(dāng)前生成器執(zhí)行完畢的時(shí)候,會(huì)執(zhí)行本方法來控制之前的協(xié)程stack進(jìn)行pop操作,首先檢查Stac是否是非空,非空的話pop出一個(gè)父協(xié)程,并將當(dāng)前協(xié)程的返回值send()到父協(xié)程中斷出繼續(xù)執(zhí)行。
協(xié)程優(yōu)勢(shì)在哪里
當(dāng)一次請(qǐng)求遇到IO的時(shí)候,同步操作會(huì)導(dǎo)致當(dāng)前請(qǐng)求阻塞在IO處等待IO返回,體現(xiàn)在swoole上就是一個(gè)請(qǐng)求一直占用一個(gè)worker。
但是當(dāng)使用了協(xié)程調(diào)度之后,用戶可以在阻塞的地方通過yield手動(dòng)中斷,交由swoole_task去異步操作,同時(shí)釋放worker占用來處理其他請(qǐng)求。
當(dāng)異步處理執(zhí)行結(jié)束后再繼續(xù)調(diào)度。
注意 php的協(xié)程只負(fù)責(zé)中斷,異步操作是Swoole_task做的
總結(jié)
以上是生活随笔為你收集整理的php协程实现mysql异步_swoole与php协程实现异步非阻塞IO开发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql最大述_mysql最大字段数量
- 下一篇: mysql中xml类型_使用 SQLXM