i have create simple publisher , consumer subscribes on queue using basic.consume
.
my consumer acknowledges messages when job runs without exception. whenever run exception don´t ack message , return early. acknowledged messages disappear queue, that´s working correctly.
want consumer pick failed messages again, way reconsume messages restarting consumer.
how need approach use case?
setup code
$channel = new amqpchannel($connection); $exchange = new amqpexchange($channel); $exchange->setname('my-exchange'); $exchange->settype('fanout'); $exchange->declare(); $queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declare(); $queue->bind('my-exchange');
consumer code
$queue->consume(array($this, 'callback')); public function callback(amqpenvelope $msg) { try { //do business logic } catch (exception $ex) { //log exception return; } return $queue->ack($msg->getdeliverytag()); }
producer code
$exchange->publish('message');
if message not acknowledged , application fails, redelivered automatically , redelivered
property on envelope set true
(unless consume them no-ack = true
flag).
upd:
you have nack
message redelivery flag in catch block
try { //do business logic } catch (exception $ex) { //log exception return $queue->nack($msg->getdeliverytag(), amqp_requeue); }
beware infinitely nacked messages while redelivery count doesn't implemented in rabbitmq , in amqp protocol @ all.
if doesn't want mess such messages , want add delay may want add sleep()
or usleep()
before nack
method call, not idea @ all.
there multiple techniques deal cycle redeliver problem:
1. rely on dead letter exchanges
- pros: reliable, standard, clear
- cons: require additional logic
2. use per message or per queue ttl
- pros: easy implement, standard, clear
- cons: long queues may loose message
examples (note, queue ttl pass number , message ttl - numeric string):
2.1 per message ttl:
$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declarequeue(); $queue->bind('my-exchange'); $exchange->publish( 'message @ ' . microtime(true), null, amqp_noparam, array( 'expiration' => '1000' ) );
2.2. per queue ttl:
$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->setargument('x-message-ttl', 1000); $queue->declarequeue(); $queue->bind('my-exchange'); $exchange->publish('message @ ' . microtime(true));
3. hold redelivers count or left redelivers number (aka hop limit or ttl in ip stack) in message body or headers
- pros: give control on messages lifetime on application level
- cons: significant overhead while have modify message , publish again, application specific, not clear
code:
$queue = new amqpqueue($channel); $queue->setname('my-queue'); $queue->declarequeue(); $queue->bind('my-exchange'); $exchange->publish( 'message @ ' . microtime(true), null, amqp_noparam, array( 'headers' => array( 'ttl' => 100 ) ) ); $queue->consume( function (amqpenvelope $msg, amqpqueue $queue) use ($exchange) { $headers = $msg->getheaders(); echo $msg->isredelivery() ? 'redelivered' : 'origin', ' '; echo $msg->getdeliverytag(), ' '; echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; echo $msg->getbody(), php_eol; try { //do business logic throw new exception('business logic failed'); } catch (exception $ex) { //log exception if (isset($headers['ttl'])) { // ttl logic if ($headers['ttl'] > 0) { $headers['ttl']--; $exchange->publish($msg->getbody(), $msg->getroutingkey(), amqp_noparam, array('headers' => $headers)); } return $queue->ack($msg->getdeliverytag()); } else { // without ttl logic return $queue->nack($msg->getdeliverytag(), amqp_requeue); // or drop without requeue } } return $queue->ack($msg->getdeliverytag()); } );
there may other ways better control message redelivers flow.
conclusion: there no silver bullet solution. have decide solution fit need best or find out other, don't forget share here ;)
Comments
Post a Comment