Taking the topic of my previous post further, it is time to (hopefully) eliminate some drawbacks that came along in the first try, such as:
O(N+M)
where N is the number of clients subscribed to the receiving channel and M is the total number of subscribed patterns (by any client).”
So all “Daemons” (if multiple) would get the same messages and trigger the same “Workers”.So let’s change the goal a bit and replace Redis with a real message broker: RabbitMQ.
Again, the “Caller” is a simple script, that sends a message to the “Broker”, to a queue named “commands”. To be a little more verbose we’ll provide a counter as an argument to the script that will be the content of the message.
src/caller.php
<?php declare(strict_types = 1);
namespace hollodotme\AsyncPhp;
require(__DIR__ . '/../vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
# Connect and retrieve a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel = $connection->channel();
# Make sure the queue 'commands' exist
$channel->queue_declare( 'commands' );
# Create and send the message
$message = new AMQPMessage( json_encode( [ 'number' => $argv[1] ], JSON_PRETTY_PRINT ) );
$channel->basic_publish( $message, '', 'commands' );
echo " [x] Message sent: {$argv[1]}\n";
# Close channel and connection
$channel->close();
$connection->close();
Also in the “Daemon” we replace the Redis subscription with a basic consumption of messages sent to the “commands” queue of the “Broker”.
When a message is consumed a callback function (Closure) will be invoked. This Closure will again send a request to our previously set up php-fpm pool, thus to our “Workers”.
src/daemon.php
<?php declare(strict_types = 1);
namespace hollodotme\AsyncPhp;
use hollodotme\FastCGI\Client;
use hollodotme\FastCGI\Requests\PostRequest;
use hollodotme\FastCGI\SocketConnections\UnixDomainSocket;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require(__DIR__ . '/../vendor/autoload.php');
# Connect to the same RabbitMP instance and get a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel = $connection->channel();
# Make sure the queue "commands" exists
$channel->queue_declare( 'commands' );
# Prepare the Fast CGI Client
$unixDomainSocket = new UnixDomainSocket( 'unix:///var/run/php/php7.1-fpm-commands.sock' );
# Define a callback function that is invoked whenever a message is consumed
$callback = function ( AMQPMessage $message ) use ( $unixDomainSocket )
{
# Decode the json message and encode it for sending to php-fpm
$messageArray = json_decode( $message->getBody(), true );
$body = http_build_query( $messageArray );
# Send an async request to php-fpm pool and receive a process ID
$fpmClient = new Client( $unixDomainSocket );
$request = new PostRequest( '/vagrant/src/worker.php', $body );
$processId = $fpmClient->sendAsyncRequest( $request );
echo " [x] Spawned process with ID {$processId} for message number {$messageArray['number']}\n";
};
# Request consumption for queue "commands" using the defined callback function
$channel->basic_consume( 'commands', '', false, true, false, false, $callback );
# Wait to finish execution as long as the channel has callbacks
while ( count( $channel->callbacks ) )
{
$channel->wait();
}
Note: You should not a use a persistent socket connection to php-fpm here, since you’ll receive notices like this once in a while:
PHP Notice: fwrite(): send of 399 bytes failed with errno=11 Resource temporarily unavailable ...
. And a persistent connection will also cause the
php-fpm pool to spawn only one child worker, instead of as much as needed / configured.
The worker remains the same for now. It simply logs the received number to a log file and sleeps for one second.
src/worker.php
<?php declare(strict_types = 1);
namespace hollodotme\AsyncPhp;
require(__DIR__ . '/../vendor/autoload.php');
error_log( "Processing {$_POST['number']}\n", 3, sys_get_temp_dir() . '/workers.log' );
sleep( 1 );
sudo service php-daemon restart && sudo service php-daemon status
php-fpm pool commands
(in a new terminal tab):
watch -n1 "sudo ps aux | grep 'php-fpm: pool commands' | grep -v grep"
logs/worker.log
(in a new terminal tab):
tailf "cat /tmp/workers.log"
tailf /var/log/syslog | grep '\[x\]'
for i in $(seq 1 100); do php7.1 src/caller.php $i; done
Results:
src/caller.php
)src/worker.php
)src/daemon.php
)This result is already pretty much what we want, but there are still some “hidden” drawbacks:
The message queue consumption
While the test above runs, a glimpse at the RabbitMQ queue list (rabbitmqctl list_queues
) shows that there is a queue named “commands” with “0” outstanding messages.
This is because our messages are not persistent and are immediately delivered to the consumer, that connects first.
This is not what we want if we want to scale to multiple “Daemons”. Currently there is no “distribution plan” for messages for multiple “Daemons”.
“Daemons” gonna die!
Occasionally consumers of messages happen to die for whatever reason. Since our messages are not persistent yet, they will be deleted from the
queue as soon as they were sent to the “Daemon”, regardless if they were fully processed or not.
To eliminate the before mentioned drawbacks we should slightly change the usage of RabbitMQ to have work queues (task queues) with persistent messages instead of volatile messages. So if messages are persistent, we also need to tell the channel when a message (task) was fully processed and can be deleted from the queue. Thus we’ll send an acknowledgement back to the channel.
src/caller.php
<?php declare(strict_types = 1);
namespace hollodotme\AsyncPhp;
require(__DIR__ . '/../vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
# Connect and retrieve a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel = $connection->channel();
# Make sure the queue 'commands' exist
# Make the queue persistent (set 3rd parameter to true)
$channel->queue_declare( 'commands', false, true );
$payload = json_encode( [ 'number' => $argv[1] ], JSON_PRETTY_PRINT );
# Create and send the message
$message = new AMQPMessage(
$payload,
[
# Make message persistent
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$channel->basic_publish( $message, '', 'commands' );
echo " [x] Message sent: {$argv[1]}\n";
# Close channel and connection
$channel->close();
$connection->close();
What has changed?
The “commands” queue was declared to be persistent (durable). This ensures that even if the RabbitMQ server dies the messages won’t be lost.
php
$channel->queue_declare( 'commands', false, true ); # Third parameter set to true
The message was declared to be persistent (durable). This gives us the ability to acknowledge when a message was processed and if it was not fully
processed it enables RabbitMQ to re-route the message to another consumer, if there is one.
php
$message = new AMQPMessage(
$payload,
[
# Make message persistent
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
src/daemon.php
<?php declare(strict_types = 1);
namespace hollodotme\AsyncPhp;
use hollodotme\FastCGI\Client;
use hollodotme\FastCGI\Requests\PostRequest;
use hollodotme\FastCGI\SocketConnections\UnixDomainSocket;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require(__DIR__ . '/../vendor/autoload.php');
# Connect to the same RabbitMP instance and get a channel
$connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' );
$channel = $connection->channel();
# Make sure the queue "commands" exists
# Make the queue persistent (set 3rd parameter to true)
$channel->queue_declare( 'commands', false, true );
# Prepare the Fast CGI Client
$unixDomainSocket = new UnixDomainSocket( 'unix:///var/run/php/php7.1-fpm-commands.sock' );
$daemonId = sprintf( 'D-%03d', mt_rand( 1, 100 ) );
# Define a callback function that is invoked whenever a message is consumed
$callback = function ( AMQPMessage $message ) use ( $unixDomainSocket, $daemonId )
{
# Decode the json message and encode it for sending to php-fpm
$messageArray = json_decode( $message->getBody(), true );
$messageArray['daemonId'] = $daemonId;
$body = http_build_query( $messageArray );
# Send an async request to php-fpm pool and receive a process ID
$fpmClient = new Client( $unixDomainSocket );
$request = new PostRequest( '/vagrant/src/worker.php', $body );
$processId = $fpmClient->sendAsyncRequest($request);
echo " [x] Spawned process with ID {$processId} for message number {$messageArray['number']}\n";
# Send the ACK(nowledgement) back to the channel for this particular message
$message->get( 'channel' )->basic_ack( $message->get( 'delivery_tag' ) );
};
# Set the prefetch count to 1 for this consumer
$channel->basic_qos( null, 1, null );
# Request consumption for queue "commands" using the defined callback function
# Enable message acknowledgement (set 4th parameter to false)
$channel->basic_consume( 'commands', '', false, false, false, false, $callback );
# Wait to finish execution as long as the channel has callbacks
while ( count( $channel->callbacks ) )
{
$channel->wait();
}
What has changed?
Just like in the “Caller”, the “commands” queue was declared to be persistent (durable). This ensures that even if the RabbitMQ server dies the messages won’t be lost.
php
$channel->queue_declare( 'commands', false, true ); # Third parameter set to true
$daemonId
was added and set in the request array. So we later can see which “Daemon” processed which messages.
$daemonId = sprintf( 'D-%03d', mt_rand( 1, 100 ) );
# ...
$callback = function ( AMQPMessage $message ) use ( $unixDomainSocket, $daemonId )
# ...
$messageArray['daemonId'] = $daemonId;
1
. That means the “Daemon” will only accept one
message at a time and leave the rest in the queue until the message was processed and acknowledged. So RabbitMQ can distribute the remained messages to another “Daemon”, if there is one.
$channel->basic_qos( null, 1, null );
# 4th parameter set to false
$channel->basic_consume( 'commands', '', false, false, false, false, $callback );
$message->get( 'channel' )->basic_ack( $message->get( 'delivery_tag' ) );
Make sure to restart the “Daemon” after these changes, otherwise the queue won’t be persistent.
src/worker.php
<?php declare(strict_types = 1);
namespace hollodotme\AsyncPhp;
require(__DIR__ . '/../vendor/autoload.php');
error_log(
" [x] Processing {$_POST['number']} from daemon {$_POST['daemonId']}\n",
3,
sys_get_temp_dir() . '/workers.log'
);
sleep( 1 );
What has changed?
" [x] Processing {$_POST['number']} from daemon {$_POST['daemonId']}\n",
In this test we will…
src/caller.php
3 times in parallel, just to simulate some traffic.
for i in 1 2 3; do for j in $(seq 1 100); do php7.1 src/caller.php $j; done & done
/etc/php/7.1/fpm/pool.d/commands.conf
):
pm.max_children = 25
Results:
src/caller.php
)systemd
src/daemon.php
)/tmp/workers.log
written by all async workers (src/worker.php
) sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
We replaced the mostly synchronous Redis PubSub system with the real async message broker RabbitMQ and established a persistent work queue able to distribute messages to multiple consumers.
We simulated a little scaling by switching a second daemon (consumer) on and off. You can play with variants and settings of the test described above. I did and it simply worked in all cases with of course differing performance, but no messages were lost during the tests and that is what matters.
This is by far a way better solution than the first try and it seems quite stable. But…
Doing the code was a bit messy, because the php-amqplib
is poorly documented and the object API is not very self-explanatory with a lot of boolean flags.
The official RabbitMQ PHP tutorials helped, but are a little outdated too.
In the end, I think it is a slim setup with a lot of potential.
The next step will be a real-world implementation and further testing.
You can find the example code of this blog post here hollodotme/experimental-async-php-vol2
I hope you liked that post. If you’re in the mood to give me feedback, tweet me a tweet or open a discussion on GitHub.
Thank you.