swoole 连接池php fpm,【转】swoole4实现数据库连接池
前言
在寫這篇文章之前,看了好幾篇實現(xiàn)連接池的文章,都是寫的很不好的。擺明忽略了連接池的很多特性,很多都不具有抗高并發(fā)和連接復(fù)用。所以自己覺得有必須把最近幾天,實現(xiàn)一個比較完整的php數(shù)據(jù)庫連接池的點滴記錄下來,望能幫助各位,感激者望多點贊和打賞。
一、數(shù)據(jù)庫連接池基本概念
所謂的數(shù)據(jù)庫連接池,一般指的就是程序和數(shù)據(jù)庫保持一定數(shù)量的數(shù)據(jù)庫連接不斷開,并且各請求的連接可以相互復(fù)用,減少重復(fù)新建數(shù)據(jù)庫連接的消耗和避免在高并發(fā)的情況下出現(xiàn)數(shù)據(jù)庫max connections等錯誤。自己總結(jié)了一下,如果要實現(xiàn)一個數(shù)據(jù)庫連接池,一般有幾個特點:
連接復(fù)用,不同的請求連接,可以放回池中,等待下個請求發(fā)分配和調(diào)用
連接數(shù)量一般維持min-max的最大最少值之間
對于空閑連接的回收
可以抗一定程度的高并發(fā),也就是說當(dāng)一次并發(fā)請求完池中所有的連接時,獲取不到連接的請求可等待其他連接的釋放
總結(jié)幾個特性后,一個基本連接池,大致要實現(xiàn)下圖功能:
創(chuàng)建連接:連接池啟動后,初始化一定的空閑連接,指定為最少的連接min。當(dāng)連接池為空,不夠用時,創(chuàng)建新的連接放到池里,但不能超過指定的最大連接max數(shù)量。
連接釋放:每次使用完連接,一定要調(diào)用釋放方法,把連接放回池中,給其他程序或請求使用。
連接分配:連接池中用pop和push的方式對等入隊和出隊分配與回收。能實現(xiàn)阻塞分配,也就是在池空并且已創(chuàng)建數(shù)量大于max,阻塞一定時間等待其他請求的連接釋放,超時則返回null。
連接管理:對連接池中的連接,定時檢活和釋放空閑連接等
二、Fpm+數(shù)據(jù)庫長連接的實現(xiàn)
利用fpm實現(xiàn):例如你要實例一個100連接數(shù)的池,開啟100個空閑fpm,然后每個fpm的連接都是數(shù)據(jù)庫長連接。一般pm.max_spare_servers = 8這個配置項就是維持連接池的空閑數(shù)量,然后pm.max_children = 50就是最大的連接數(shù)量。和fpm的進程數(shù)量一致。
三、基于swoole的實現(xiàn)
swoole簡單介紹(更多參閱swoole官網(wǎng))
swoole是一個PHP實現(xiàn)異步網(wǎng)絡(luò)通信的引擎或者擴展,其中實現(xiàn)了很多傳統(tǒng)PHP-fpm沒有的東西,例如異步的客戶端,異步Io,常駐內(nèi)存,協(xié)程等等,一個個優(yōu)秀的擴展,其中異步和協(xié)程等概念能應(yīng)用于高并發(fā)場景。缺點是文檔和入門的門檻都比較高,需要排坑。附上swoole的運行流程和進程結(jié)構(gòu)圖:
運行流程圖
進程/線程架構(gòu)圖
基于swoole現(xiàn)實時的注意事項
首先,為了減少大家對之后運行示例代碼產(chǎn)生不必要的天坑,先把注意事項和場景問題放前面:
1、程序中使用了協(xié)程的通信管道channel(與go的chan差不多的),其中swoole2是不支持chan->pop($timeout)中timeout超時等待的,所以必須用swoole4版本
3、筆者使用的環(huán)境為:PHP 7.1.18和swoole4作為此次開發(fā)的環(huán)境
基于swoole現(xiàn)實連接池的方法
首先,此次利用swoole實現(xiàn)連接池,運用到swoole以下技術(shù)或者概念
1、連接變量池,這里可以看做一個數(shù)組或者隊列,利用swoole全局變量的常駐內(nèi)存特性,只要變量沒主動unset掉,數(shù)組或隊列中的連接對象可以一直保持,不釋放。主要參考:https://wiki.swoole.com/wiki/page/p-zend_mm.html
2、協(xié)程。協(xié)程是純用戶狀態(tài)的線程,通過協(xié)作的方式而不是搶占的方式來切換。首先此次的連接池兩處用到協(xié)程:
一個是mysql的協(xié)程客戶端,為什么要用協(xié)程客戶端,因為如果是用同步客戶端PDO,在一個進程處理內(nèi),就算有幾百個連接池,swoole worker進程中用普通的PDO方式,隨便并發(fā)多少個請求,每一個請求都只能等上一個請求執(zhí)行完畢,woker才處理下一個請求,這里就算阻塞了。為了讓一個worker支持阻塞切換出cpu去處理其他請求,所以要用到協(xié)程的協(xié)助切換,或者異步客戶端也可以,但是異步客戶端使用起來嵌套太多,很不方便。swoole協(xié)程可以無感知的用同步的代碼編寫方式達到異步IO的效果和性能。
第二個是底層實現(xiàn)了協(xié)程切換和調(diào)度的channel,以下詳述什么是channel
3、Coroutine/channel通道,類似于go語言的chan,支持多生產(chǎn)者協(xié)程和多消費者協(xié)程。底層自動實現(xiàn)了協(xié)程的切換和調(diào)度。高并發(fā)時,容易出連接池為空時,如果用一般的array或者splqueue()作為介質(zhì)存儲連接對象變量,不能產(chǎn)生阻塞等待其他請求釋放的效果,也就是說只能直接返回null.。所以這里用了一個swoole4協(xié)程中很牛逼的channel通過管道作為存儲介質(zhì),它的出隊方法pop($timeout)可以指定阻塞等待指定時間后返回。注意,是swoole2是沒有超時timeout的參數(shù),不適用此場景。在go語言中,如果chan等待或者push了沒有消費或者生產(chǎn)一對一的情況,是會發(fā)生死鎖。所以swoole4的timeout應(yīng)該是為了避免無限等待為空channel情況而產(chǎn)生。主要參考:
channel切換的例子:
use \Swoole\Coroutine\Channel;
$chan = new Channel();
go(function () use ($chan) {
echo "我是第一個協(xié)程,等待3秒內(nèi)有push就執(zhí)行返回" . PHP_EOL;
$p = $chan->pop(2);#1
echo "pop返回結(jié)果" . PHP_EOL;
var_dump($p);
});
go(function () use ($chan) {
co::sleep(1);#2
$chan->push(1);
});
echo "main" . PHP_EOL;
#1處代碼會首先執(zhí)行,然后遇到pop(),因為channel還是空,會等待2s。此時協(xié)程會讓出cpu,跳到第二個協(xié)程執(zhí)行,然后#2出睡眠1秒,push變量1進去channel后返回#1處繼續(xù)執(zhí)行,成功取車通過中剛push的值1.運行結(jié)果為:
如果把#2處的睡眠時間換成大于pop()的等待時間,結(jié)果是:
根據(jù)這些特性最終實現(xiàn)連接池的抽象封裝類為:
/**
* 連接池封裝.
* User: user
* Date: 2018/9/1
* Time: 13:36
*/
use Swoole\Coroutine\Channel;
abstract class AbstractPool
{
private $min;//最少連接數(shù)
private $max;//最大連接數(shù)
private $count;//當(dāng)前連接數(shù)
private $connections;//連接池組
protected $spareTime;//用于空閑連接回收判斷
//數(shù)據(jù)庫配置
protected $dbConfig = array(
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
private $inited = false;
protected abstract function createDb();
public function __construct()
{
$this->min = 10;
$this->max = 100;
$this->spareTime = 10 * 3600;
$this->connections = new Channel($this->max + 1);
}
protected function createObject()
{
$obj = null;
$db = $this->createDb();
if ($db) {
$obj = [
'last_used_time' => time(),
'db' => $db,
];
}
return $obj;
}
/**
* 初始換最小數(shù)量連接池
* @return $this|null
*/
public function init()
{
if ($this->inited) {
return null;
}
for ($i = 0; $i < $this->min; $i++) {
$obj = $this->createObject();
$this->count++;
$this->connections->push($obj);
}
return $this;
}
public function getConnection($timeOut = 3)
{
$obj = null;
if ($this->connections->isEmpty()) {
if ($this->count < $this->max) {//連接數(shù)沒達到最大,新建連接入池
$this->count++;
$obj = $this->createObject();
} else {
$obj = $this->connections->pop($timeOut);//timeout為出隊的最大的等待時間
}
} else {
$obj = $this->connections->pop($timeOut);
}
return $obj;
}
public function free($obj)
{
if ($obj) {
$this->connections->push($obj);
}
}
/**
* 處理空閑連接
*/
public function gcSpareObject()
{
//大約2分鐘檢測一次連接
swoole_timer_tick(120000, function () {
$list = [];
/*echo "開始檢測回收空閑鏈接" . $this->connections->length() . PHP_EOL;*/
if ($this->connections->length() < intval($this->max * 0.5)) {
echo "請求連接數(shù)還比較多,暫不回收空閑連接\n";
}#1
while (true) {
if (!$this->connections->isEmpty()) {
$obj = $this->connections->pop(0.001);
$last_used_time = $obj['last_used_time'];
if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收
$this->count--;
} else {
array_push($list, $obj);
}
} else {
break;
}
}
foreach ($list as $item) {
$this->connections->push($item);
}
unset($list);
});
}
}
同步PDO客戶端下實現(xiàn)
/**
* 數(shù)據(jù)庫連接池PDO方式
* User: user
* Date: 2018/9/8
* Time: 11:30
*/
require "AbstractPool.php";
class MysqlPoolPdo extends AbstractPool
{
protected $dbConfig = array(
'host' => 'mysql:host=10.0.2.2:3306;dbname=test',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolPdo();
}
return self::$instance;
}
protected function createDb()
{
return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']);
}
}
$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
MysqlPoolPdo::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
$db = null;
$obj = MysqlPoolPdo::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj ? $obj['db'] : null;
}
if ($db) {
$db->query("select sleep(2)");
$ret = $db->query("select * from guestbook limit 1");
MysqlPoolPdo::getInstance()->free($obj);
$response->end(json_encode($ret));
}
});
$httpServer->start();
代碼調(diào)用過程詳解:
1、server啟動時,調(diào)用init()方法初始化最少數(shù)量(min指定)的連接對象,放進類型為channelle的connections對象中。在init中循環(huán)調(diào)用中,依賴了createObject()返回連接對象,而createObject()
中是調(diào)用了本來實現(xiàn)的抽象方法,初始化返回一個PDO db連接。所以此時,連接池connections中有min個對象。
2、server監(jiān)聽用戶請求,當(dāng)接收發(fā)請求時,調(diào)用連接數(shù)的getConnection()方法從connections通道中pop()一個對象。此時如果并發(fā)了10個請求,server因為配置了1個worker,所以再pop到一個對象返回時,遇到sleep()的查詢,因為用的連接對象是pdo的查詢,此時的woker進程只能等待,完成后才能進入下一個請求。因此,池中的其余連接其實是多余的,同步客戶端的請求速度只能和woker的數(shù)量有關(guān)。
3、查詢結(jié)束后,調(diào)用free()方法把連接對象放回connections池中。
ab -c 10 -n 10運行的結(jié)果,單個worker處理,select sleep(2) 查詢睡眠2s,同步客戶端方式總共運行時間為20s以上,而且mysql的連接始終維持在一條。結(jié)果如下:
協(xié)程客戶端Coroutine\MySQL方式的調(diào)用
/**
* 數(shù)據(jù)庫連接池協(xié)程方式
* User: user
* Date: 2018/9/8
* Time: 11:30
*/
require "AbstractPool.php";
class MysqlPoolCoroutine extends AbstractPool
{
protected $dbConfig = array(
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 10,
);
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolCoroutine();
}
return self::$instance;
}
protected function createDb()
{
$db = new Swoole\Coroutine\Mysql();
$db->connect(
$this->dbConfig
);
return $db;
}
}
$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set(
['worker_num' => 1]
);
$httpServer->on("WorkerStart", function () {
//MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();
MysqlPoolCoroutine::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
$db = null;
$obj = MysqlPoolCoroutine::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj ? $obj['db'] : null;
}
if ($db) {
$db->query("select sleep(2)");
$ret = $db->query("select * from guestbook limit 1");
MysqlPoolCoroutine::getInstance()->free($obj);
$response->end(json_encode($ret));
}
});
$httpServer->start();
代碼調(diào)用過程詳解
1、同樣的,協(xié)程客戶端方式下的調(diào)用,也是實現(xiàn)了之前封裝好的連接池類AbstractPool.php。只是createDb()的抽象方法用了swoole內(nèi)置的協(xié)程客戶端去實現(xiàn)。
2、server啟動后,初始化都和同步一樣。不一樣的在獲取連接對象的時候,此時如果并發(fā)了10個請求,同樣是配置了1個worker進程在處理,但是在第一請求到達,pop出池中的一個連接對象,執(zhí)行到query()方法,遇上sleep阻塞時,此時,woker進程不是在等待select的完成,而是切換到另外的協(xié)程去處理下一個請求。完成后同樣釋放對象到池中。當(dāng)中有重點解釋的代碼段中g(shù)etConnection()中。
public function getConnection($timeOut = 3)
{
$obj = null;
if ($this->connections->isEmpty()) {
if ($this->count < $this->max) {//連接數(shù)沒達到最大,新建連接入池
$this->count++;
$obj = $this->createObject();#1
} else {
$obj = $this->connections->pop($timeOut);#2
}
} else {
$obj = $this->connections->pop($timeOut);#3
}
return $obj;
}
當(dāng)調(diào)用到getConnection()時,如果此時由于大量并發(fā)請求過多,連接池connections為空,而沒達到最大連接max數(shù)量時時,代碼運行到#1處,調(diào)用了createObject(),新建連接返回;但如果連接池connections為空,而到達了最大連接數(shù)max時,代碼運行到了#2處,也就是$this->connections->pop($timeOut),此時會阻塞$timeOut的時間,如果期間有鏈接釋放了,會成功獲取到,然后協(xié)程返回。超時沒獲取到,則返回false。
3、最后說一下協(xié)程Mysql客戶端一項重要配置,那就是代碼里$dbConfig中timeout值的配置。這個配置是意思是最長的查詢等待時間。可以看一個例子說明下:
go(function () {
$start = microtime(true);
$db = new Swoole\Coroutine\MySQL();
$db->connect([
'host' => '10.0.2.2',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'timeout' => 4#1
]);
$db->query("select sleep(5)");
echo "我是第一個sleep五秒之后\n";
$ret = $db->query("select user from guestbook limit 1");#2
var_dump($ret);
$use = microtime(true) - $start;
echo "協(xié)程mysql輸出用時:" . $use . PHP_EOL;
});
#1處代碼,如果timeout配了4s查詢超時,而第一條查詢select sleep(5)阻塞后,協(xié)程切換到下一條sql的執(zhí)行,其實$db并不能執(zhí)行成功,因為用一個連接,同一個協(xié)程中,其實執(zhí)行是同步的,所以此時第二條查詢在等待4s超時后,沒獲取到db的連接執(zhí)行,就會執(zhí)行失敗。而如果第一條查詢執(zhí)行的時間少于這個timeout,那么會執(zhí)行查詢成功。猜猜上面執(zhí)行用時多少?結(jié)果如下:
如果把timeout換成6s呢,結(jié)果如下:
所以要注意的是,協(xié)程的客戶端內(nèi)執(zhí)行其實是同步的,不要理解為異步,它只是遇到IO阻塞時能讓出執(zhí)行權(quán),切換到其他協(xié)程而已,不能和異步混淆。
ab -c 10 -n 10運行的結(jié)果,單個worker處理,select sleep(2) 查詢睡眠2s,協(xié)程客戶端方式總共運行時間為2s多。結(jié)果如下:
數(shù)據(jù)庫此時的連接數(shù)為10條(show full PROCESSLIST):
再嘗試 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多個并發(fā)的處理,時間是20多秒,mysql連接數(shù)達到指定的最大值100個。結(jié)果如下:
四、后言
現(xiàn)在連接池基本實現(xiàn)了高并發(fā)時的連接分配和控制,但是還有一些細節(jié)要處理,例如:
并發(fā)時,建立了max個池對象,不能一直在池中維護這么多,要在請求空閑時,把連接池的數(shù)量維持在一個空閑值內(nèi)。這里是簡單做了gcSpareObject()的方法實現(xiàn)空閑處理。直接在初始化woker的時候調(diào)用:MysqlPoolCoroutine::getInstance()->init()->gcSpareObject();就會定時檢測回收。問題是如何判斷程序比較空閑,值得再去優(yōu)化。
定時檢測連接時候是活的,剔除死鏈
假如程序忘記調(diào)用free()釋放對象到池,是否有更好方法避免這種情況?
對于以上,希望各大神看到后,能提供不錯的意見!
總結(jié)
以上是生活随笔為你收集整理的swoole 连接池php fpm,【转】swoole4实现数据库连接池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python求反余弦_python 反余
- 下一篇: Introduction to Wir