We are hiring !

Asynchronous

Request

Processing

Jan Gregor Emge-Triebel

Hello everyone

  • Jan Gregor Emge-Triebel
  • Head of Development
  • at Lillydoo GmbH, Frankfurt
  • Consultant
  • Symfony, Silex, Sylius, Grav

Before we start

Why asynchronous processing ?

  • Cut down reaction time
  • Move long running logic
  • Exchange data/messages with other (sub-)systems
  • Transfer load
  • Microservices
  • Scaling
  • Parallelization

Why talk (only) about requests ?

"Symfony is a request based framework."

~ Fabien Potencier
SymfonyLive Berlin 2014

Queuing for beginners

Requests

Requests

fastcgi_finish_request()

fastcgi_finish_request()

$response = new FanceResponseClass();
echo $response->getOutput();

fastcgi_finish_request();

$service = HeavyLiftingService();
$service->doStuff();

fastcgi_finish_request()

Requests

Requests

Architecture

A tale of three queuing systems

Gearman, Resque and RabbitMQ

https://gitlab.com/JanGregor/async-processing-slides/tags/symfony-live-london-2017

Video recording

Message Queuing

PHP-Resque

Queuing

$resque = $this->get('bcc_resque.resque');
$job    = new HeavyLiftingJob([
    'name' => $name
]);
$resque->enqueue($job);

RabbitMQ

Queuing

$msg = ['name' => $name];
$this->get('old_sound_rabbit_mq.heavy_lifting_job_producer')
    ->publish(json_encode($msg));

Bernhard

Queuing

$message = new PlainMessage('HeavyLifting', [
    'name' => $name,
]);
$producer->produce($message);

PHP-Enqueue

Queuing

$msg = ['name' => $name];
$producer->sendCommand('HeavyLiftingProcessor', json_encode($msg));

Symfony Messenger (>= 4.1)

Queuing

$message = new HeavyLiftingMessage($name);
$bus->dispatch($message);

They are all the same!

Abstraction

class CustomPublisher
{
    public function publish($message)
    {
        $payload = $this->serializer->serialize($message, 'json');
        $this->producer->sendCommand('HeavyLiftingProcessor', $payload);

        // or (e.g. RabbitMQ bundle)
        // $this->producer->publish($payload)
    }
}

Just like: HTTP-Clients, Repositories, ...

Message Consuming

PHP-Resque

Consuming

class HeavyLiftingJob implements PerformantJobInterface
{
    public function perform($args)
    {
        echo $args['name'];
    }
}

RabbitMQ

Consuming

class HeavyLiftingService implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
       $arguments = json_decode($msg->body);
       echo $arguments->name;

       return ConsumerInterface::MSG_ACK;
    }
}

Bernhard

Consuming

class HeavyLiftingReceiver
{
  public function heavyLifting(Message $message)
  {
      $arguments = $message->all();
      echo $arguments['name'];
  }
}

PHP-Enqueue

Consuming

class HeavyLiftingProcessor implements PsrProcessor
{
    public function process(PsrMessage $message, PsrContext $session)
    {
        $arguments = json_decode($message->getBody());
        echo $arguments->name;

        return self::ACK;
    }
}

Symfony Messenger (>= 4.1)

Consuming

class MyMessageHandler
{
    public function __invoke(HeavyLiftingMessage $message)
    {
        echo $message->name;
    }
}

They are all the same!

Abstraction

class MyMessageHandler
{
    public function __invoke(HeavyLiftingMessage $message)
    {
        $this->echoService->print($message);
    }
}

Just like: Controller, Listener, Commands, ...

Benefits from Abstraction

Logging / UUID

Logging / UUID

class CustomPublisher
{
    public function publish($message)
    {
        $message->uuid = uniqid();

        $payload = $this->serializer->serialize($message, 'json');
        $this->producer->sendCommand(
           'HeavyLiftingProcessor',
            $payload
        );

        $logger->info('Did Published message.', [
            'id' => $message->uuid,
            'message' => $message,
        ]);
    }
}

Events

Events

class CustomPublisher
{
    public function publish($message)
    {
        $event = new GenericEvent($message);
        $this->dispatcher->dispatch('app.publisher.pre_publish', $event);

        $payload = $this->serializer->serialize($message, 'json');
        $this->producer->sendCommand('HeavyLiftingProcessor', $payload);

        $event = new GenericEvent($message);
        $this->dispatcher->dispatch('app.publisher.post_publish', $event);
    }
}

Dead Letter Exchange

Dead Letter Exchange

class CustomConsumer
{
    public function consume($message)
    {
        is_int($message->tries) ? $message->tries++ : $message->tries = 1;

        try {
            $this->echoService->print($message);
        } catch (\Exception $e) {
            if ($message->tries >= $this->maxTries) {
                $this->deadLetterPublisher->publish($message)
             } else {
                $this->publisher->publish($message)
             }
        }
    }
}

Graceful Shutdown

Graceful Shutdown

class CustomConsumer
{
    public function consume($message)
    {
        $this->echoService->print($message);

        if ($this->shouldShutdown()) {
            exit();
        }
    }
}

Lessons learned

Lessons learned - Worker

  • PHP threads are not meant to be long running (leaks)
  • Use supervisor to check and restart Consumers
  • Booting consumers takes rather long
  • Open database connections, timeouts
  • Better quit when inactive

Lessons learned - Worker

  • Cap maximum runtime per thread
  • Automatically shutdown after processign x messages
  • Memory limit: RabbitMQ-Bundle utilizes "soft-quota"
  • Prefetching (Warning: Order!)
  • Custom classes (common logging etc..)

Lessons learned - Messages

  • Compact/short messages
  • Queues eat up a lot of storage / RAM
  • Common format such as JSON or XML
    • Do not serialize PHP objects or arrays
    • Ensure interoperability with other systems and languages

Lessons learned - Message Transformation

  • Use JMS/Serializer to convert objects to JSON
    • Deserialize JSON to PHP objects in consumer
    • Objects can and should be validated (with symfony forms and asserts)
    • Reload entities / documents to keep them up to date (doctrine/unit of work)
  • Alternative approach: PHPs JsonSerializable Interface

Lessons learned - Message Header

  • Use header for content-type and version
    • Message formats can change
    • Consumers must remain compatible
  • Unqiue IDs make logging / debugging / live easier
  • Meta data (timestamp, session, user, app)

Lessons learned - Deployment

  • Workers need an up to date code base
  • Workers need a graceful shutdown
  • Workers must only be shutdown when idling
  • Otherwise : Loss of message / data

Other challenges

  • Significant change of infrastructure
  • Choose your libraries and dependencies carefully!
  • Moving load is not removing load
  • Rethinking necessary
    • Display of status / intermediate result
    • Websockets / Push vs. Polling
  • Interdependencies of messages
    • Especially when running in parallel
  • Performance (Doctrine ORM / ODM)

Before we end

Feedback is important!

Thank you !

Any questions ?

Questions ...

What's the right tool for my job ?

  • Well, that depends ...
  • Bernhard and PHP-Enqueue/-Resque are good starting points
  • Only use RabbitMQ when you really need it