§1 RabbitMQ延迟队列

RabbitMQ延迟队列,主要是借助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)来实现。

涉及到2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

\"\"

 

本例中, 定义2组exchange和queue。

agentpayquery1exchange		agentpayquery1queue(routingkey为delay)
agentpayquery2exchange		agentpayquery2queue(routingkey为delay)
agentpayquery1queue是缓冲队列,消息过期路由到agentpayquery2queue

 

 

§2 生产者

生产者配置:

<!-- 连接服务配置 -->
<rabbit:connection-factory
        id=\"connectionFactoryProducer\"
        addresses=\"${mq.ip}\"    //192.168.40.40:5672
        username=\"${username}\"
        password=\"${password}\"
        channel-cache-size=\"${cache.size}\"
        publisher-confirms=\"${publisher.confirms}\"
        publisher-returns=\"${publisher.returns}\"
        virtual-host=\"/\"
/>

<!--========================出款查询 延迟队列配置 begin =========================-->
<rabbit:queue id=\"agentpayquery2queue\" durable=\"true\" auto-delete=\"false\" exclusive=\"false\" name=\"agentpayquery2queue\"/>
<rabbit:direct-exchange name=\"agentpayquery2exchange\" durable=\"true\" auto-delete=\"false\" id=\"agentpayquery2exchange\">
    <rabbit:bindings>
        <rabbit:binding queue=\"agentpayquery2queue\" key=\"delay\" />
    </rabbit:bindings>
</rabbit:direct-exchange>


<rabbit:queue id=\"agentpayquery1queue\" durable=\"true\" auto-delete=\"false\" exclusive=\"false\" name=\"agentpayquery1queue\" >
    <rabbit:queue-arguments>
        <entry key=\"x-dead-letter-exchange\" value=\"agentpayquery2exchange\"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange name=\"agentpayquery1exchange\" durable=\"true\" auto-delete=\"false\" id=\"agentpayquery1exchange\">
    <rabbit:bindings>
        <rabbit:binding queue=\"agentpayquery1queue\" key=\"delay\" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<!--定义RabbitTemplate实例-->
<rabbit:template id=\"agentpayQueryMsgTemplate\"
                 exchange=\"agentpayquery1exchange\"  routing-key=\"delay\"
                 queue=\"agentpayquery1queue\"
                 connection-factory=\"connectionFactoryProducer\" message-converter=\"mqMessageConverter\"
                 mandatory=\"true\"
                 confirm-callback=\"publisherConfirmsReturns\" return-callback=\"publisherConfirmsReturns\"/>
<!--========================出款查询 延迟队列配置 end =========================-->

 

 

生产者消息入队:

import org.spring work.amqp.AmqpException;
import org.spring work.amqp.core.Message;
import org.spring work.amqp.core.MessageDeliveryMode;
import org.spring work.amqp.core.MessagePostProcessor;
import org.spring work.amqp.rabbit.core.RabbitTemplate;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.stereotype.Service;

@Service
public class AgentpayQueryProducer {

    private static final Logger log = LogManager.getLogger(AgentpayQueryProducer.class.getSimpleName());

    @Autowired
    private RabbitTemplate agentpayQueryMsgTemplate;

    public void sendDelay(String message, int delaySeconds) {
        String expiration = String.valueOf(delaySeconds * 1000);
        agentpayQueryMsgTemplate.convertAndSend(( ) message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message)
                    throws AmqpException {
                message.getMessageProperties().setExpiration(expiration);
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                log.info(\"出款查询数据入队:{}\", new String(message.getBody()));
                return message;
            }
        });
    }
}

 

 

§3消费者

消费端的配置无他:

<!-- 连接服务配置  channel-cache-size=\"25\" -->
<rabbit:connection-factory id=\"connectionFactory\"
                           addresses=\"${mq.ip}\"
                           username=\"${username}\"
                           password=\"${password}\" />

<bean id=\"agentpayQueryConsumer\" class=\"com.emaxcard.rpc.payment.service.impl.batchpay.AgentpayQueryConsumer\" />

<!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:queue id=\"agentpayquery2queue\" durable=\"true\" auto-delete=\"false\" exclusive=\"false\" name=\"agentpayquery2queue\" />

<rabbit:listener-container connection-factory=\"connectionFactory\" acknowledge=\"auto\"

                           max-concurrency=\"20\"
                           concurrency=\"10\"
                           prefetch=\"10\">
    <rabbit:listener ref=\"agentpayQueryConsumer\" queues=\"agentpayquery2queue\" />
</rabbit:listener-container>

 

消息消费:

import com.alibaba.fastjson.JSON;
import com.emaxcard.enums.BatchPayStatus;
import com.emaxcard.exceptions.ResponseException;
import com.emaxcard.payment.vo.PaymentRecord;
import com.emaxcard.rpc.payment.model.PaymentRecordModel;
import org.spring work.amqp.core.Message;
import org.spring work.amqp.core.MessageListener;
import org.spring work.beans.factory.annotation.Autowired;

public class AgentpayQueryConsumer implements MessageListener {

    private static final Logger log = LogManager.getLogger();

    @Autowired
    QueryGatewayService queryGatewayService;
    @Autowired
    AgentpayQueryProducer agentpayQueryProducer;

    @Override
    public void onMessage(Message message) {
        String mqMsg = new String(message.getBody());
        log.info(\"出款查询数据出队:{}\", mqMsg);
        PaymentRecord paymentRecordModel;
        try {
            paymentRecordModel = JSON.parse (mqMsg, PaymentRecord.class);
        } catch (Exception ex) {
            log.info(\"消息格式不是PaymentRecordModel,结束。\");
            return;
        }

        try {
            BatchPayStatus payStatus = queryGatewayService.queryGateway(paymentRecordModel);

            // 非终态,继续放入延迟队列
            if (BatchPayStatus.SUCCESS != payStatus && BatchPayStatus.FAILED != payStatus) {
                if (BatchPayStatus.NOTEXIST == payStatus) {
                    log.info(\"查询结果是{},不再处理\", payStatus);
                } else {
                    agentpayQueryProducer.sendDelay(mqMsg, 10);
                }
            }
        } catch (Exception ex) {
            if (ex instanceof ResponseException) {
                log.info(\"转账查询{},paymentId{},处理错误:{}\",
                        paymentRecordModel.getTransNo(), paymentRecordModel.getPaymentId(), ex.getMessage());
            } else {
                log.error(\"处理消息异常:\", ex);
            }
        }

    }
}

 

 

§4 使用延迟队列要注意

1. 因为是队列,所以即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。

2. 当缓冲队列里一旦出现未设置过期时间的消息,那么就会造成整个队列堵塞。消费端也无法消费到消息。通过日志可以看到,打印出来的都是 BlockingQueueConsumer。

 

 

\"\"

 

Get messages Ack Mode选择“Ack message requeue false”,可以将消息消费掉

\"\"

 

收藏 打印