Workerman的代码分析

2018年7月17日 0 作者 筱枫

最近找工作看到许多公司都要求有workerman的使用经验,但本人对其并不熟悉,遂fork一份进行研究,其实代码本身并不复杂,很多东西都是linux里面的内容

注意:workerman是以事件形式驱动的

首先按照官方示例写一个简单的脚本,然后命令行启动
代码如下:

require_once __DIR__ .'/vendor/autoload.php';
use Workerman\Worker;
$ws_worker = new Worker("http://0.0.0.0:2356");
$ws_worker->count = 4;
$ws_worker->onConnect = function($connection)
{
    $connection->send("connection");
};
$ws_worker->onMessage = function($connection, $data)
{
    $connection->send('message');
};
Worker::runAll();

接着在命令行启动 php test.php start,然后访问ip:2356便可以看到输出了,ok,现在workerman成功跑起来,接下来我们就研究下里面的东西
如果细心的你会发现,只输出了connection,而没有message,为什么?我会在文末给出原因
首先看第五行

$ws_worker = new Worker("http://0.0.0.0:2356");

追进去看看

平淡无奇,一些初始化工作,值得注意的是其中的第2026、2027行,这里巧妙的获取了当前执行文件的路径,也就是test.php,如果直接使用echo FILE; 获取到的只是worker.php这个文件
接下来继续看test.php
当中的onConnect和onMessage很明显是回调函数,所以再次暂且略过,先不提
之后就是最重要的函数:runAll()
我们跟进去看看

大写懵逼?
不要急,慢慢来,第一个函数跟进去,很简单,就是检测是不是在命令行下启动,同时检测是linux还是windows
第二个函数:init() 初始化,这里面的东西可就多了,不要心慌,慢慢来看
开头没有什么好说的,设置错误处理函数,设置pid文件路径和log文件路径,设置权限,初始状态,进程标题,初始化每个进程的id
主要关注:

Timer::init();

public static function init($event = null)
{
    if ($event) {
        self::$_event = $event;
    } else {
        if (function_exists('pcntl_signal')) {
            pcntl_signal(SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false);
        }
    }
}

这个函数非常简单,主要是设置了定时器信号,用于当超时时处理任务的情况
跟着signalHandle走,进入到self::tick()这个函数

这段代码很简单,接着继续往下走

static::parseCommand();

其中的内容也很简单,就是解析字符串,再对指定的命令做指定的操作,
如果对其中的

posix_kill($master_pid, SIGUSR2);

有些头晕眼花的话,可以看下php.net里面的手册,至于后面的SIGUSR2这个信号的具体定义,可以使用

man 7 signal

来查看详细说明
接着往下

这步也很简单,如果启动时添加了-d选项,在前面解析命令时会将$daemonize这个变量设置为true,然后这里就会创建一个子进程,之后父进程会退出,这样子进程就变成孤儿了
变成孤儿可不行,万一这个孤儿到处乱跑怎么办?于是只能给他找个养父,就是init进程,这样这个子进程就会一直在后台运行了,除非用命令杀死,否则会一直存在下去,这就是守护进程
下一个函数:

static::initWorkers()

这个函数看上很多,其实也不复杂,主要就是初始化了一些东西
主要关注

// Listen.
if (!$worker->reusePort) {
    $worker->listen();
}

这行代码
reusePort是端口复用的标志,详情可见官方文档:https://github.com/walkor/workerman-manual/blob/master/chinese/src/worker-development/reuse-port.md
接着进入到listen里面
代码其实不是很复杂,只不过有些多而已

// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);// Check application layer protocol class.if (!isset(static::$_builtinTransports[$scheme])) {
    $scheme         = ucfirst($scheme);
    $this->protocol = substr($scheme,0,1)==='\\' ? $scheme : '\\Protocols\\' . $scheme;
    if (!class_exists($this->protocol)) {
        $this->protocol = "\\Workerman\\Protocols\\$scheme";
        if (!class_exists($this->protocol)) {
            throw new Exception("class \\Protocols\\$scheme not exist");
        }
    }

官方的注释很好的解释了这段代码的意思:将http://0.0.0.0:2356给分解成数组,然后根据协议头http、websocket、ws等去实例化对应的类,进行处理协议数据
接下来的一大段代码就是常规的创建套接字操作,最后用

stream_set_blocking($this->_mainSocket, 0);

设置成非阻塞模式。
为什么要设置成非阻塞,而不是阻塞?
阻塞可以省下cpu,但是一个进程在同一时间只能处理一个请求,而非阻塞则可以同时处理多个,从性能上看,非阻塞更优
最后的

$this->resumeAccept();

则是进行了一些准备接受新连接的操作,最后将
$this->_pauseAccept 标志位置为false,开始准备接收
关于acceptConnection和acceptUdpConnection这两个函数,点进去会发现开头是接收数据,接着初始化connection,然后你会发现一个熟悉的函数
$this->onConnect
看看我们最开始写的代码,里面是不是有个onConnect的回调函数?没错,就是在这里调用!
至于是什么时候开始调用
acceptConnection
这个函数,则是由事件决定,这个稍后再说
接下来的

static::installSignal()

也是非常简单
主要就是设置了一些信号量,追到
signalHandler这个函数里面看的时候,发现也只不过是对信号量一些响应操作,像是将当前状态写入文件中、退出等等
之后两个函数

static::saveMasterPid();
static::displayUI();

都没有什么好说的,一个是保存了当前的进程号,一个就是在屏幕上打出

如果感兴趣的话,可以跟进去看看
之后激动人心的事情来了,前期做了那么多准备,现在终于要开始运行起来了
跟进
forkWorkers()这个函数,再点进相应的执行函数,笔者进入的是

static::forkWorkersForLinux();

这个函数里面最后再检测状态是否为初始化,然后初始化,接着对每一个进程进行处理
这里可以看到,其通过while语句来实现创建
$worker->count
个子进程,还记得我们最开始写的
$ws_worker->count = 4;
吗?没错,就是在这里用到了。
继续跟进

static::forkOneWorkerForLinux($worker);

最开始的这段看着可能有点迷糊

// Get available worker id.
$id = static::getId($worker->workerId, 0);
if ($id === false) {
    return;
}
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
    static::$_pidMap[$worker->workerId][$pid] = $pid;
    static::$_idMap[$worker->workerId][$id]   = $pid;
}

但其实很简单,在getId这个函数中会去寻找先前initId()中初始化的数据,之前初始化后的结构为

$_idMap[$worker->workerId][0] = 0;
$_idMap[$worker->workerId][1] = 0;
$_idMap[$worker->workerId][2] = 0;
$_idMap[$worker->workerId][3] = 0;


现在这样找到后,进行fork,然后对于父进程来说,所需要做的就是管理好子进程,所以直接将id放入$_idMap中即可,这里的$pid就是子进程的id
接着看子进程的代码

elseif (0 === $pid) {
    if ($worker->reusePort) {
        $worker->listen();
    }
    if (static::$_status === static::STATUS_STARTING) {
        static::resetStd();
    }
    static::$_pidMap  = array();
    // Remove other listener.
    foreach(static::$_workers as $key => $one_worker) {
        if ($one_worker->workerId !== $worker->workerId) {
            $one_worker->unlisten();
            unset(static::$_workers[$key]);
        }
    }
    Timer::delAll();
    static::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
    $worker->setUserAndGroup();
    $worker->id = $id;

注意:在这里面的worker,实际上指向的仍旧是之前传值进来的worker
接着就是有重用端口的标志,则设置相关套接字属性,之后resetStd()重新设置标准io,接着就是停止掉其他worker的监听
注意:这里的worker指的是最开始的new Worker(“http://0.0.0.0:2356”); 在有时候,可能会创建多个worker,例如
new Worker(“websocket://0.0.0.0:2356”);
new Worker(“ws://0.0.0.0:2356”);
等等

最后是停止所有的定时器,设置进程标题,设置进程的用户和组
之后是这段代码

$worker->run();

这个函数里面首先设置了事件处理器,默认是select,如果有libevent或者event扩展则会使用它们,这样可以极大的提高性能!
之后在重新安装下signal,也就是信号,可能你会有疑问,之前为什么要安装信号呢?
之前安装的原因是给其一个默认的信号处理器,防止在启动后,还没有使用对应的事情处理器时做应急处理,这里是将信号安装给事件处理器!
之后初始化定时器,调用回调函数通知worker准备工作
然后进入到事件的主循环中,开始等待事件产生
当socket准备完毕,可以读了之后,linux系统会产生一个可读信号传送给子进程,然后就会在这个loop里面进行处理,具体处理情况可以看workerman/events 里面的各个类文件
对于不同的事件处理,其实现方式不同,但都会回调对应的连接的读写操作
等等,它怎么会回调哪个函数进行读写操作呢?代码里面没有看到呀
还记得acceptConnection这个函数吗?
跟进去,看看

$connection                         = new TcpConnection($new_socket, $remote_address);

里面很明显的有一段添加读事件操作的代码

Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));

在baseRead函数中,它读取完数据后会回调onMessage函数,而我们的业务代码也都是写在这当中的

至于最后的

static::monitorWorkers();

则是主进程用于监视子进程,一旦子进程出现什么问题,主进程立马销毁,重开一个线程
是不是感觉有些懵?没关系,让我们来整理个流程图

话又说回来,select又是怎么知道有数据要读呢?
是内核告诉它的
内核又是怎么知道呢?
内核本来就知道,当数据从网卡传进来时,内核会中断,去将网卡的数据读取到缓冲区,在读取的过程中内核就知道是哪个描述符的
epoll也是基于此原理

最后,我们来解决最开始提出的问题,为什么只显示connect,而不显示message呢?
我们先来抓包看看,一切都很正常,确实是connect
那么,我们追踪代码,在TcpConnection这个文件中,有个send方法,根据之前的代码来看,我们在回调代码里面所用的connection就是它
初看之下没有什么稀奇的,就是先尝试直接发送,如果不成功,再看看连接如果没关闭,则直接放入缓冲区中
但是,注意其中这段代码

// Try to call protocol::encode($send_buffer) before sending.
        if (false === $raw && $this->protocol !== null) {
            $parser      = $this->protocol;
            $send_buffer = $parser::encode($send_buffer, $this);
            if ($send_buffer === '') {
                return null;
            }
        }

还记得这个protocol是什么吗?
翻翻看之前的代码,可以明显找到就是对应的协议类,例如http对应的就是Http的协议处理类,里面包含了字符串解码成数据的算法等
那么,我们可以打个调试信息看看
在刚刚的这段代码下面,我们添加如下代码

var_dump($send_buffer);

然后,重新启动你的workerman,先用ctrl+c杀掉,再重新开一下就行
接着,再访问一下,你会在控制台发现调试信息

可以明显看到,connection被编码成一个http标准的协议字符串了
这样,单次请求的回应就完成了,如果客户端没有其他的请求,则只能收到这个connection,后续的message虽然发送成功,但并没有实际到达客户端
其实,如果你多刷新几次,就有可能看到message
为何,仔细看响应头,里面有个
Connection: keep-alive
这个是保持连接的意思,所以,message这一步后续的数据,可能会直接接收到并响应。
如此,就是workerman的核心了,之后可能会模仿其写一个类似的程序出来,也正好加深印象。