php进程间通讯

2016-05-13 16:42  1197人阅读  评论 (0)

php单进程单线程处理批量任务太慢了,受不鸟了,但是php不能多线程,最终选择了多进程处理批量任务.

php多进程主要使用for进行分裂,然后利用的unix/linux的信号量进行进程间通讯.

本例使用的是,生产者=>消费者=>收集器,的模式.

<?php
// ===== 全局变量 =====

// ipc进程间通讯
$key = ftok(__FILE__, "a");
$queue = msg_get_queue($key);

// 进程ID
$producer_pid = 0;
$consumers_pid = array();
$collector_pid = posix_getpid();

// ===== 消费者 =====
for ($i=0; $i < 2; $i++) {
    $consumer_pid = pcntl_fork();
    if ($consumer_pid == -1) {
        exit("could not fork!\n");
    } else if ($consumer_pid) {
        // pcntl_wait($status);
        echo "consumer_pid: $consumer_pid\n";
        $consumers_pid[] = $consumer_pid;
    } else {
        $pid = posix_getpid();
        echo "consumer_pid: $pid start\n";

        while (true) {
            msg_receive($queue, $pid, $msgtype, 1024, $message);

            if ($message == "exit") {
                break;
            }

            // 数据处理
            $n = intval($message);

            msg_send($queue, $collector_pid, $n * $n);
        }

        exit("consumer ok!\n");
    }
}

// ===== 产生者 =====
$producer_pid = pcntl_fork();
if ($producer_pid == -1) {
    exit("could not fork!\n");
} else if ($producer_pid) {
    // pcntl_wait($status);
    echo "producer_pid: $producer_pid\n";
} else {
    $pid = posix_getpid();
    echo "producer_pid: $pid start\n";

    $n = 0;

    for ($i=0; $i < 10; $i++) {
        foreach ($consumers_pid as $consumer_pid) {
            $n++;
            msg_send($queue, $consumer_pid, $n);
        }

        sleep(1);
    }

    foreach ($consumers_pid as $consumer_pid) {
        msg_send($queue, $consumer_pid, "exit");
    }

    sleep(1);

    msg_send($queue, $collector_pid, "exit");

    exit("producer ok!\n");
}

// ===== 收集器 =====
while (true) {
    msg_receive($queue, $collector_pid, $msgtype, 1024, $message);

    if ($message == "exit") {
        break;
    }

    echo sprintf("% 5d: %d\n", $msgtype, $message);
}

exit("collector ok!\n");