前言

在 Server 程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,Web 服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢。

Swoole 提供了异步任务处理的功能,可以投递一个异步任务到 TaskWorker 进程池中执行,不影响当前请求的处理速度。

参考文献

task异步任务初实现
task异步任务解释:https://wiki.swoole.com/#/start/start_task
异步服务器实现:https://wiki.swoole.com/#/start/start_tcp_server
task异步任务数量计算:https://wiki.swoole.com/#/server/setting?id=task_worker_num
测试代码下载:https://github.com/mailjobblog/dev_swoole/tree/master/210524_task

task异步消息丢失问题解决
swoole用 sysvmsg 消息队列通信:https://wiki.swoole.com/#/server/setting?id=task_ipc_mode
swoole 设置linux消息队列的key:https://wiki.swoole.com/#/server/setting?id=message_queue_key
php实现IPC消息队列:http://rango.swoole.com/archives/103
linux的IPC消息队列:https://www.linuxidc.com/Linux/2018-05/152191.htm

图例
worker和task关系图:https://www.kdocs.cn/view/l/sjVNT7NoZs3G
task处理大量数据图:https://www.kdocs.cn/view/l/scsd4tcZHaqg

Task异步任务的实现解析

异步任务 task id 计算方式

假设 Worker id 的范围是
min:0,max:$serv->setting[‘worker_num’]

则 Task 任务 id 计算范围如下
min:serv>setting[workernum]maxserv->setting['worker_num’],`max:`serv->setting[‘worker_num’] + $serv->setting['task_worker_num’]

Task如何产生

当 worker 进程得到任务后, fork 出来一个 task 进程。然后把任务投递到 taskWorker 进程池中执行。

Worker 和 Task 投递实现

image-20210527135121857

测试代码

<?php
$serv = new Swoole\Server('0.0.0.0', 9601);

//设置异步任务的工作进程数量
$serv->set([
    'worker_num'      => 4,
    'task_worker_num' => 6
]);

//此回调函数在worker进程中执行
$serv->on('Receive', function ($serv, $fd, $reactor_id, $data) {
    //投递异步任务
    $task_id = $serv->task($data);
    echo "代码继续执行中: id={$task_id}\n";
});

//处理异步任务(此回调函数在task进程中执行)
$serv->on('Task', function ($serv, $task_id, $reactor_id, $data) {
    echo "正在处理异步任务[id={$task_id}]" . PHP_EOL;
    sleep(6);// 睡眠中,模拟任务处理
    //返回任务执行的结果
    $serv->finish("{$data} -> OK");
});

//处理异步任务的结果(此回调函数在worker进程中执行)
$serv->on('Finish', function ($serv, $task_id, $data) {
    echo "异步任务执行完成咯[{$task_id}] Finish: {$data}" . PHP_EOL;
});

$serv->start();

测试说明

  • 服务端启动以上代码,设置4个worker和6个task,启动服务
  • 客户端在服务器执行 telnet 127.0.0.1 9601 进行消息投递。
  • 然后查看 task 进程树。可以发现 4 个worker进程,6个task进程。
# 查看进程树
pstree -ap | grep -v grep | grep task

# 树状展示
pstree -p 25658

image-20210527133841965

Task异步任务消息丢失问题

问题描述:task 异步任务,处理大量数据的时候。当 worker 进程不断的向 task 进程投递任务,然而当所有的 task 任务都处于忙碌状态的时候,此时的 task 异步任务会被暂时存放到 linux 服务器的临时目录 tmp 中。当 tmp 临时目录里存在一些数据的并且服务器宕机后,重启服务器后,然后再启动server服务,socket 连接会被重置,导致新的 server 服务中的 task “不认识” tmp 中的数据,由此产生丢失问题。

image-20210527144237618

解决方案:Linux底层实现了 IPC 消息机制,对于默认task消息投递,加入了sysvms消息队列中间层。一个简单的流程表达是 Worker -> Sysvms -> tmp -> Task。在投递任务的时候,先定义一个key,然后基于这个key生成一个消息队列。worker 投递任务的时候到达 Sysvms 中间者,如果所有 task 都处于忙碌状态,则此任务会存储在 linux Sysvms 消息队列中。如果发送宕机,基于之前定义的 key ,就可以得到还没处理完毕的task任务,然后接着处理。

image-20210527144254975

查看 linux sysvmsg 消息队列:

ipcs -qa

image-20210527203009809

swoole 解决问题方案:

image-20210527144313867