前言
在 Server 程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,Web 服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢。
Swoole 提供了异步任务处理的功能,可以投递一个异步任务到 TaskWorker
进程池中执行,不影响当前请求的处理速度。
参考文献
task异步任务初实现
task异步任务解释:https://wiki.swoole.com/#/start/start_task
异步服务器实现:https://wiki.swoole.com/#/start/start_tcp_server
task异步任务数量计算:https://wiki.swoole.com/#/server/setting?id=task_worker_num
测试代码下载:https://github.com/mailjobblog/dev_swoole/tree/master/210524_task
task异步消息丢失问题解决
swoole用 sysvmsg
消息队列通信:https://wiki.swoole.com/#/server/setting?id=task_ipc_mode
swoole 设置linux消息队列的key:https://wiki.swoole.com/#/server/setting?id=message_queue_key
php实现IPC消息队列:http://rango.swoole.com/archives/103
linux的IPC消息队列:https://www.linuxidc.com/Linux/2018-05/152191.htm
图例:
worker和task关系图:https://www.kdocs.cn/view/l/sjVNT7NoZs3G
task处理大量数据图:https://www.kdocs.cn/view/l/scsd4tcZHaqg
Task异步任务的实现解析
异步任务 task id 计算方式
假设 Worker id 的范围是:
min:
0,max:
$serv->setting[‘worker_num’]
则 Task 任务 id 计算范围如下:
min:
serv->setting[‘worker_num’] + $serv->setting['task_worker_num’]
Task如何产生
当 worker 进程得到任务后, fork 出来一个 task 进程。然后把任务投递到 taskWorker 进程池中执行。
Worker 和 Task 投递实现
测试代码
<?php
$serv = new Swoole\Server('0.0.0.0', 9601);
//设置异步任务的工作进程数量
$serv->set([
'worker_num' => 4,
'task_worker_num' => 6
]);
//此回调函数在worker进程中执行
$serv->on('Receive', function ($serv, $fd, $reactor_id, $data) {
//投递异步任务
$task_id = $serv->task($data);
echo "代码继续执行中: id={$task_id}\n";
});
//处理异步任务(此回调函数在task进程中执行)
$serv->on('Task', function ($serv, $task_id, $reactor_id, $data) {
echo "正在处理异步任务[id={$task_id}]" . PHP_EOL;
sleep(6);// 睡眠中,模拟任务处理
//返回任务执行的结果
$serv->finish("{$data} -> OK");
});
//处理异步任务的结果(此回调函数在worker进程中执行)
$serv->on('Finish', function ($serv, $task_id, $data) {
echo "异步任务执行完成咯[{$task_id}] Finish: {$data}" . PHP_EOL;
});
$serv->start();
测试说明
- 服务端启动以上代码,设置4个worker和6个task,启动服务
- 客户端在服务器执行
telnet 127.0.0.1 9601
进行消息投递。 - 然后查看 task 进程树。可以发现 4 个worker进程,6个task进程。
# 查看进程树
pstree -ap | grep -v grep | grep task
# 树状展示
pstree -p 25658
Task异步任务消息丢失问题
问题描述:task 异步任务,处理大量数据的时候。当 worker
进程不断的向 task
进程投递任务,然而当所有的 task
任务都处于忙碌状态的时候,此时的 task
异步任务会被暂时存放到 linux
服务器的临时目录 tmp
中。当 tmp
临时目录里存在一些数据的并且服务器宕机后,重启服务器后,然后再启动server
服务,socket
连接会被重置,导致新的 server
服务中的 task
“不认识” tmp
中的数据,由此产生丢失问题。
解决方案:Linux底层实现了 IPC 消息机制
,对于默认task
消息投递,加入了sysvms
消息队列中间层。一个简单的流程表达是 Worker -> Sysvms -> tmp -> Task
。在投递任务的时候,先定义一个key
,然后基于这个key
生成一个消息队列。worker
投递任务的时候到达 Sysvms
中间者,如果所有 task
都处于忙碌
状态,则此任务会存储在 linux Sysvms
消息队列中。如果发送宕机,基于之前定义的 key
,就可以得到还没处理完毕的task
任务,然后接着处理。
查看 linux sysvmsg 消息队列:
ipcs -qa
swoole 解决问题方案: