1、简单示例
(1)RocketMQ目前指定的延时时间间隔有1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h,用等级来表示时间间隔。

public class DelayProducer {
 
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(\"rmq-group\");
        producer.setNamesrvAddr(\"localhost:9876\");
        producer.start();
        try {
            for (int i = 0; i < 3; i++) {
                Message msg = new Message(\"TopicA-test\",// topic
                        \"TagA\",// tag
                        (new Date() + \"Hello RocketMQ ,QuickStart 11\" + i)
                                .getBytes()// body
                );
                //1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
                // level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推
                msg.setDelayTimeLevel(2);
 
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
       producer.shutdown();
    }
 
}
2、Broker持久化延时消息
         Broker对于接收到的消息首先会判断一下是不是延时消息,如果是延时消息会将消息以SCHEDULE_TOPIC_XXXX为topic替换原有的topic名称进行持久化,实现方法在CommitLog的putMessage中。

        首先会判断msg的延时标准如果大于0,则重新设置消息的topic名称和queueId,之后将消息以SCHEDULE_TOPIC_XXXX为topic,以延时时间的等级为queueId持久化到commitlog文件中。

            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
 
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 
                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
3、延时消息处理
        Broker将延时消息以SCHEDULE_TOPIC_XXXX为topic名称将消息进行持久化,接下来我们看看Broker是如何将消息在延时消息到达之后进行消息还原的。

        RocketMQ提供了定时任务服务ScheduleMessageService,通过定时任务的方式不断的读取topic为SCHEDULE_TOPIC_XXXX何queueId为延时等级的消息进行消息还原处理,这样消息被还原之后消费者就可以拉取消息了。

   每个消费等级有个定时任务DeliverDelayedMessageTimerTask:

for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
 
            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
在DeliverDelayedMessageTimerTask中根据SCHEDULE_TOPIC_XXXX名称和延时等级对应的queueId获取消息队列,然后从commitlog中读取消息,还原消息的原有信息(消息的原topic信息)再将消息持久化到commitlog文件中,这样消费者就可以拉取消息了。

public void executeOnTimeup() {
            //获取消费者消息
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
 
            long failScheduleOffset = offset;
 
            if (cq != null) {
                //读取消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();
 
                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can\'t find ext content.So re compute tags code.
                                    log.error(\"[BUG] can\'t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}\",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }
 
                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 
                            long countdown = deliverTimestamp - now;
 
                            if (countdown <= 0) {
                                //获取消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);
 
                                if (msgExt != null) {
                                    try {
                                        //还原消息信息topic名称等
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        //重新将消息持久化到commitlog中
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);
 
                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                \"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}\",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                                        /*
                                         * XXX: warn and notify me
                                         */
                                        log.error(
                                            \"ScheduleMessageService, messageTimeup execute error, drop it. msgExt=\"
                                                + msgExt + \", nextOffset=\" + nextOffset + \",offsetPy=\"
                                                + offsetPy + \",sizePy=\" + sizePy, e);
                                    }
                                }
                            } else {
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for
 
                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {
 
                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else {
 
                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error(\"schedule CQ offset invalid. offset=\" + offset + \", cqMinOffset=\"
                            + cqMinOffset + \", queueId=\" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)
 
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);

--------------------- 
作者:IAMTJW 
来源:CSDN 
原文:https://blog.csdn.net/qq924862077/article/details/84987179 
版权声明:本文为博主原创文章,转载请附上博文链接!

收藏 打印