i have create simple publisher , consumer subscribes on queue using basic.consume. 
my consumer acknowledges messages when job runs without exception. whenever run exception don´t ack message , return early. acknowledged messages disappear queue, that´s working correctly.
 want consumer pick failed messages again, way reconsume messages restarting consumer. 
how need approach use case?
setup code
$channel = new amqpchannel($connection);  $exchange = new amqpexchange($channel);  $exchange->setname('my-exchange'); $exchange->settype('fanout'); $exchange->declare();  $queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declare(); $queue->bind('my-exchange');   consumer code
$queue->consume(array($this, 'callback'));  public function callback(amqpenvelope $msg) {     try {         //do business logic     } catch (exception $ex) {         //log exception         return;     }     return $queue->ack($msg->getdeliverytag()); }   producer code
$exchange->publish('message');      
if message not acknowledged , application fails, redelivered automatically , redelivered property on envelope set true (unless consume them no-ack = true flag).
upd:
you have nack message redelivery flag in catch block
    try {         //do business logic     } catch (exception $ex) {         //log exception         return $queue->nack($msg->getdeliverytag(), amqp_requeue);     }   beware infinitely nacked messages while redelivery count doesn't implemented in rabbitmq , in amqp protocol @ all.
if doesn't want mess such messages , want add delay may want add sleep() or usleep() before nack method call, not idea @ all.
there multiple techniques deal cycle redeliver problem:
1. rely on dead letter exchanges
- pros: reliable, standard, clear
 - cons: require additional logic
 
2. use per message or per queue ttl
- pros: easy implement, standard, clear
 - cons: long queues may loose message
 
examples (note, queue ttl pass number , message ttl - numeric string):
2.1 per message ttl:
$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declarequeue(); $queue->bind('my-exchange');  $exchange->publish(     'message @ ' . microtime(true),     null,     amqp_noparam,     array(         'expiration' => '1000'     ) );   2.2. per queue ttl:
$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->setargument('x-message-ttl', 1000); $queue->declarequeue(); $queue->bind('my-exchange');  $exchange->publish('message @ ' . microtime(true));   3. hold redelivers count or left redelivers number (aka hop limit or ttl in ip stack) in message body or headers
- pros: give control on messages lifetime on application level
 - cons: significant overhead while have modify message , publish again, application specific, not clear
 
code:
$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declarequeue(); $queue->bind('my-exchange');  $exchange->publish(     'message @ ' . microtime(true),     null,     amqp_noparam,     array(         'headers' => array(             'ttl' => 100         )     ) );  $queue->consume(     function (amqpenvelope $msg, amqpqueue $queue) use ($exchange) {         $headers = $msg->getheaders();         echo $msg->isredelivery() ? 'redelivered' : 'origin', ' ';         echo $msg->getdeliverytag(), ' ';         echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';         echo $msg->getbody(), php_eol;          try {             //do business logic             throw new exception('business logic failed');         } catch (exception $ex) {             //log exception             if (isset($headers['ttl'])) {                 // ttl logic                  if ($headers['ttl'] > 0) {                     $headers['ttl']--;                      $exchange->publish($msg->getbody(), $msg->getroutingkey(), amqp_noparam, array('headers' => $headers));                 }                  return $queue->ack($msg->getdeliverytag());             } else {                 // without ttl logic                 return $queue->nack($msg->getdeliverytag(), amqp_requeue); // or drop without requeue             }          }          return $queue->ack($msg->getdeliverytag());     } );   there may other ways better control message redelivers flow.
conclusion: there no silver bullet solution. have decide solution fit need best or find out other, don't forget share here ;)
Comments
Post a Comment