Experimental async PHP - Volume 2

Second try to run a PHP daemon receiving messages from RabbtiMQ and executing commands async on the php-fpm socket

Updates

Preamble

Taking the topic of my previous post further, it is time to (hopefully) eliminate some drawbacks that came along in the first try, such as:

So let's change the goal a bit and replace Redis with a real message broker: RabbitMQ.

Goal

Caller-RabbitMQ-Daemon-Socket-Worker


Used environment


The "Caller" version #1

Again, the "Caller" is a simple script, that sends a message to the "Broker", to a queue named "commands". To be a little more verbose we'll provide a counter as an argument to the script that will be the content of the message.

src/caller.php

<?php declare(strict_types = 1);

namespace hollodotme\AsyncPhp;

require(__DIR__ . '/../vendor/autoload.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

# Connect and retrieve a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel    = $connection->channel();

# Make sure the queue 'commands' exist
$channel->queue_declare( 'commands' );

# Create and send the message
$message = new AMQPMessage( json_encode( [ 'number' => $argv[1] ], JSON_PRETTY_PRINT ) );
$channel->basic_publish( $message, '', 'commands' );

echo " [x] Message sent: {$argv[1]}\n";

# Close channel and connection
$channel->close();
$connection->close();

The "Daemon" version #1

Also in the "Daemon" we replace the Redis subscription with a basic consumption of messages sent to the "commands" queue of the "Broker".

When a message is consumed a callback function (Closure) will be invoked. This Closure will again send a request to our previously set up php-fpm pool, thus to our "Workers".

src/daemon.php

<?php declare(strict_types = 1);

namespace hollodotme\AsyncPhp;

use hollodotme\FastCGI\Client;
use hollodotme\FastCGI\Requests\PostRequest;
use hollodotme\FastCGI\SocketConnections\UnixDomainSocket;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require(__DIR__ . '/../vendor/autoload.php');

# Connect to the same RabbitMP instance and get a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel    = $connection->channel();

# Make sure the queue "commands" exists
$channel->queue_declare( 'commands' );

# Prepare the Fast CGI Client
$unixDomainSocket = new UnixDomainSocket( 'unix:///var/run/php/php7.1-fpm-commands.sock' );

# Define a callback function that is invoked whenever a message is consumed
$callback = function ( AMQPMessage $message ) use ( $unixDomainSocket )
{
    # Decode the json message and encode it for sending to php-fpm
    $messageArray = json_decode( $message->getBody(), true );
    $body         = http_build_query( $messageArray );

    # Send an async request to php-fpm pool and receive a process ID
    $fpmClient = new Client( $unixDomainSocket );

    $request = new PostRequest( '/vagrant/src/worker.php', $body );

    $processId = $fpmClient->sendAsyncRequest( $request );

    echo " [x] Spawned process with ID {$processId} for message number {$messageArray['number']}\n";
};

# Request consumption for queue "commands" using the defined callback function
$channel->basic_consume( 'commands', '', false, true, false, false, $callback );

# Wait to finish execution as long as the channel has callbacks
while ( count( $channel->callbacks ) )
{
    $channel->wait();
}

Note: You should not a use a persistent socket connection to php-fpm here, since you'll receive notices like this once in a while: PHP Notice: fwrite(): send of 399 bytes failed with errno=11 Resource temporarily unavailable .... And a persistent connection will also cause the php-fpm pool to spawn only one child worker, instead of as much as needed / configured.


The "Worker"

The worker remains the same for now. It simply logs the received number to a log file and sleeps for one second.

src/worker.php

<?php declare(strict_types = 1);

namespace hollodotme\AsyncPhp;

require(__DIR__ . '/../vendor/autoload.php');

error_log( "Processing {$_POST['number']}\n", 3, sys_get_temp_dir() . '/workers.log' );

sleep( 1 );

First test

  1. (Re)start the "Daemon" and check if it's running properly:
    sudo service php-daemon restart && sudo service php-daemon status
  2. Watch the process list for spawning php-fpm pool commands (in a new terminal tab):
    watch -n1 "sudo ps aux | grep 'php-fpm: pool commands' | grep -v grep"
  3. Watch the logs/worker.log (in a new terminal tab):
    tailf "cat /tmp/workers.log"
  4. Watch the syslog for spawned process by "Daemon" (in a new terminal tab):
    tailf /var/log/syslog | grep '\[x\]'
  5. Execute the "Caller" 100 times:
    for i in $(seq 1 100); do php7.1 src/caller.php $i; done

Results:


This result is already pretty much what we want, but there are still some "hidden" drawbacks:

  1. The message queue consumption
    While the test above runs, a glimpse at the RabbitMQ queue list (rabbitmqctl list_queues) shows that there is a queue named "commands" with "0" outstanding messages. This is because our messages are not persistent and are immediately delivered to the consumer, that connects first. This is not what we want if we want to scale to multiple "Daemons". Currently there is no "distribution plan" for messages for multiple "Daemons".

  2. "Daemons" gonna die!
    Occasionally consumers of messages happen to die for whatever reason. Since our messages are not persistent yet, they will be deleted from the queue as soon as they were sent to the "Daemon", regardless if they were fully processed or not.

Persist and acknowledge

To eliminate the before mentioned drawbacks we should slightly change the usage of RabbitMQ to have work queues (task queues) with persistent messages instead of volatile messages. So if messages are persistent, we also need to tell the channel when a message (task) was fully processed and can be deleted from the queue. Thus we'll send an acknowledgement back to the channel.

The "Caller" version #2

src/caller.php

<?php declare(strict_types = 1);

namespace hollodotme\AsyncPhp;

require(__DIR__ . '/../vendor/autoload.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

# Connect and retrieve a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel    = $connection->channel();

# Make sure the queue 'commands' exist
# Make the queue persistent (set 3rd parameter to true)
$channel->queue_declare( 'commands', false, true );

$payload = json_encode( [ 'number' => $argv[1] ], JSON_PRETTY_PRINT );

# Create and send the message
$message = new AMQPMessage(
    $payload,
    [
        # Make message persistent
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    ]
);

$channel->basic_publish( $message, '', 'commands' );

echo " [x] Message sent: {$argv[1]}\n";

# Close channel and connection
$channel->close();
$connection->close();

What has changed?


The "Daemon" version #2

src/daemon.php

<?php declare(strict_types = 1);

namespace hollodotme\AsyncPhp;

use hollodotme\FastCGI\Client;
use hollodotme\FastCGI\Requests\PostRequest;
use hollodotme\FastCGI\SocketConnections\UnixDomainSocket;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require(__DIR__ . '/../vendor/autoload.php');

# Connect to the same RabbitMP instance and get a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel    = $connection->channel();

# Make sure the queue "commands" exists
# Make the queue persistent (set 3rd parameter to true)
$channel->queue_declare( 'commands', false, true );

# Prepare the Fast CGI Client
$unixDomainSocket = new UnixDomainSocket( 'unix:///var/run/php/php7.1-fpm-commands.sock' );

$daemonId = sprintf( 'D-%03d', mt_rand( 1, 100 ) );

# Define a callback function that is invoked whenever a message is consumed
$callback = function ( AMQPMessage $message ) use ( $unixDomainSocket, $daemonId )
{
    # Decode the json message and encode it for sending to php-fpm
    $messageArray             = json_decode( $message->getBody(), true );
    $messageArray['daemonId'] = $daemonId;
    $body                     = http_build_query( $messageArray );

    # Send an async request to php-fpm pool and receive a process ID
    $fpmClient = new Client( $unixDomainSocket );

    $request = new PostRequest( '/vagrant/src/worker.php', $body );

    $processId = $fpmClient->sendAsyncRequest($request);

    echo " [x] Spawned process with ID {$processId} for message number {$messageArray['number']}\n";

    # Send the ACK(nowledgement) back to the channel for this particular message
    $message->get( 'channel' )->basic_ack( $message->get( 'delivery_tag' ) );
};

# Set the prefetch count to 1 for this consumer
$channel->basic_qos( null, 1, null );

# Request consumption for queue "commands" using the defined callback function
# Enable message acknowledgement (set 4th parameter to false)
$channel->basic_consume( 'commands', '', false, false, false, false, $callback );

# Wait to finish execution as long as the channel has callbacks
while ( count( $channel->callbacks ) )
{
    $channel->wait();
}

What has changed?

Make sure to restart the "Daemon" after these changes, otherwise the queue won't be persistent.


The "Worker" version #2

src/worker.php

<?php declare(strict_types = 1);

namespace hollodotme\AsyncPhp;

require(__DIR__ . '/../vendor/autoload.php');

error_log(
    " [x] Processing {$_POST['number']} from daemon {$_POST['daemonId']}\n",
    3,
    sys_get_temp_dir() . '/workers.log'
);

sleep( 1 );

What has changed?


Second test

In this test we will...

Results:


Summary

The next step will be a real-world implementation and further testing.

You can find the example code of this blog post here hollodotme/experimental-async-php-vol2

I hope you liked that post. If you're in the mood to give me feedback, tweet me a tweet or open a discussion on GitHub.

Thank you.


[░░░░░░░░░░░░░░░░░░] 2 days | 01/11/2017