1、简单示例
(1)RocketMQ目前指定的延时时间间隔有1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h,用等级来表示时间间隔。
public class DelayProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(\"rmq-group\");
producer.setNamesrvAddr(\"localhost:9876\");
producer.start();
try {
for (int i = 0; i < 3; i++) {
Message msg = new Message(\"TopicA-test\",// topic
\"TagA\",// tag
(new Date() + \"Hello RocketMQ ,QuickStart 11\" + i)
.getBytes()// body
);
//1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
// level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推
msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
2、Broker持久化延时消息
Broker对于接收到的消息首先会判断一下是不是延时消息,如果是延时消息会将消息以SCHEDULE_TOPIC_XXXX为topic替换原有的topic名称进行持久化,实现方法在CommitLog的putMessage中。
首先会判断msg的延时标准如果大于0,则重新设置消息的topic名称和queueId,之后将消息以SCHEDULE_TOPIC_XXXX为topic,以延时时间的等级为queueId持久化到commitlog文件中。
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
3、延时消息处理
Broker将延时消息以SCHEDULE_TOPIC_XXXX为topic名称将消息进行持久化,接下来我们看看Broker是如何将消息在延时消息到达之后进行消息还原的。
RocketMQ提供了定时任务服务ScheduleMessageService,通过定时任务的方式不断的读取topic为SCHEDULE_TOPIC_XXXX何queueId为延时等级的消息进行消息还原处理,这样消息被还原之后消费者就可以拉取消息了。
每个消费等级有个定时任务DeliverDelayedMessageTimerTask:
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
在DeliverDelayedMessageTimerTask中根据SCHEDULE_TOPIC_XXXX名称和延时等级对应的queueId获取消息队列,然后从commitlog中读取消息,还原消息的原有信息(消息的原topic信息)再将消息持久化到commitlog文件中,这样消费者就可以拉取消息了。
public void executeOnTimeup() {
//获取消费者消息
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//读取消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can\'t find ext content.So re compute tags code.
log.error(\"[BUG] can\'t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}\",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
//获取消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//还原消息信息topic名称等
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//重新将消息持久化到commitlog中
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
\"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}\",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
\"ScheduleMessageService, messageTimeup execute error, drop it. msgExt=\"
+ msgExt + \", nextOffset=\" + nextOffset + \",offsetPy=\"
+ offsetPy + \",sizePy=\" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error(\"schedule CQ offset invalid. offset=\" + offset + \", cqMinOffset=\"
+ cqMinOffset + \", queueId=\" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
---------------------
作者:IAMTJW
来源:CSDN
原文:https://blog.csdn.net/qq924862077/article/details/84987179
版权声明:本文为博主原创文章,转载请附上博文链接!
继续阅读与本文标签相同的文章
当自动化驾驶技术和普及后 我们还需要考驾照吗?
-
大宗货运如何实现“重去重回”
2026-05-18栏目: 教程
-
世界互联网大会就在这里 互联网之光博览中心抢先看
2026-05-18栏目: 教程
-
无尽商机!SpaceX开通“共享火箭”,小卫星入轨更便宜、能定制?
2026-05-18栏目: 教程
-
浅谈分布式计算的开发与实现(一)
2026-05-18栏目: 教程
-
打开Pdf合并的正确方式
2026-05-18栏目: 教程
