php - Consuming not acknowledge messages from RabbitMq -


i have create simple publisher , consumer subscribes on queue using basic.consume.

my consumer acknowledges messages when job runs without exception. whenever run exception don´t ack message , return early. acknowledged messages disappear queue, that´s working correctly.
want consumer pick failed messages again, way reconsume messages restarting consumer.

how need approach use case?

setup code

$channel = new amqpchannel($connection);  $exchange = new amqpexchange($channel);  $exchange->setname('my-exchange'); $exchange->settype('fanout'); $exchange->declare();  $queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declare(); $queue->bind('my-exchange'); 

consumer code

$queue->consume(array($this, 'callback'));  public function callback(amqpenvelope $msg) {     try {         //do business logic     } catch (exception $ex) {         //log exception         return;     }     return $queue->ack($msg->getdeliverytag()); } 

producer code

$exchange->publish('message'); 

if message not acknowledged , application fails, redelivered automatically , redelivered property on envelope set true (unless consume them no-ack = true flag).

upd:

you have nack message redelivery flag in catch block

    try {         //do business logic     } catch (exception $ex) {         //log exception         return $queue->nack($msg->getdeliverytag(), amqp_requeue);     } 

beware infinitely nacked messages while redelivery count doesn't implemented in rabbitmq , in amqp protocol @ all.

if doesn't want mess such messages , want add delay may want add sleep() or usleep() before nack method call, not idea @ all.

there multiple techniques deal cycle redeliver problem:

1. rely on dead letter exchanges

  • pros: reliable, standard, clear
  • cons: require additional logic

2. use per message or per queue ttl

  • pros: easy implement, standard, clear
  • cons: long queues may loose message

examples (note, queue ttl pass number , message ttl - numeric string):

2.1 per message ttl:

$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declarequeue(); $queue->bind('my-exchange');  $exchange->publish(     'message @ ' . microtime(true),     null,     amqp_noparam,     array(         'expiration' => '1000'     ) ); 

2.2. per queue ttl:

$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->setargument('x-message-ttl', 1000); $queue->declarequeue(); $queue->bind('my-exchange');  $exchange->publish('message @ ' . microtime(true)); 

3. hold redelivers count or left redelivers number (aka hop limit or ttl in ip stack) in message body or headers

  • pros: give control on messages lifetime on application level
  • cons: significant overhead while have modify message , publish again, application specific, not clear

code:

$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declarequeue(); $queue->bind('my-exchange');  $exchange->publish(     'message @ ' . microtime(true),     null,     amqp_noparam,     array(         'headers' => array(             'ttl' => 100         )     ) );  $queue->consume(     function (amqpenvelope $msg, amqpqueue $queue) use ($exchange) {         $headers = $msg->getheaders();         echo $msg->isredelivery() ? 'redelivered' : 'origin', ' ';         echo $msg->getdeliverytag(), ' ';         echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';         echo $msg->getbody(), php_eol;          try {             //do business logic             throw new exception('business logic failed');         } catch (exception $ex) {             //log exception             if (isset($headers['ttl'])) {                 // ttl logic                  if ($headers['ttl'] > 0) {                     $headers['ttl']--;                      $exchange->publish($msg->getbody(), $msg->getroutingkey(), amqp_noparam, array('headers' => $headers));                 }                  return $queue->ack($msg->getdeliverytag());             } else {                 // without ttl logic                 return $queue->nack($msg->getdeliverytag(), amqp_requeue); // or drop without requeue             }          }          return $queue->ack($msg->getdeliverytag());     } ); 

there may other ways better control message redelivers flow.

conclusion: there no silver bullet solution. have decide solution fit need best or find out other, don't forget share here ;)


Comments