多消费者(多线程)对MNS的使用
背景
在阿里云MNS消费者的使用中,阿里云提供了使用 消息服务-最佳实践-长轮询
的代码和说明,在解决方案中阿里云这么说道
在开了上百个线程同时访问的情况下,如果队列里已经没有消息了,那么其实不需要上百个线程都同时挂LongPolling。只需要有1-N个线程挂LongPolling就足够了。挂LongPolling的线程在发现队列里有消息时,可以唤醒其他线程一起来取消息以达到快速响应的目的
Receiver内部做了LongPolling的排他机制,只要有一个线程在做LongPolling,那么其他线程只需要Wait就可以了。 —— [解决方案]
但是如何启动1-N个线程,同时产生多个消费者,并没有给出说明,阿里云官方提供的demo中是使用在main方法中启用:
CloudAccount account = new CloudAccount("ACCESS_ID", "ACCESS_KEY", "ENDPOINT"); sMNSClient = account.getMNSClient(); sMNSClient.getQueueRef("TestQueue").delete(); sMNSClient.getQueueRef("TestQueue").create(); Thread thread1 = new Thread(new Runnable() { public void run() { WorkerFunc(1); } }); Thread thread2 = new Thread(new Runnable() { public void run() { WorkerFunc(2); } }); Thread thread3 = new Thread(new Runnable() { public void run() { WorkerFunc(3); } }); 这里我提供一种比较好的方法,可以利用spring IOC容器的依赖注入,来管理和启动多个消费者(多线程)。
方法展示
Spring会通过依赖注入的方式,来管理关联对象的生命周期,所以我们可以将消费者的产生管理,都由Spring IOC容器代劳,也就是说,我把消费者创建的控制权都交给Spring容器。方法如下
@Componentpublic class NormalProcessComponent { private static Logger log = LoggerFactory.getLogger(NormalProcessComponent.class); private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(30); public NormalProcessComponent(){ for(int i = 0; i < 50; i++){ threadPool.execute(new Runnable() { @Override public void run() { try { process(); } catch (Exception e) { e.printStackTrace(); } } }); } } public void process() throws Exception { //使用阿里云官方提供的方法 MessageReceiver receiver = new MessageReceiver(workerId, sMNSClient, "TestQueue"); while (true) { Message message = receiver.receiveMessage(); try { //取出Que中的信息 result = message .getMessageBodyAsString(); JSON params = JSON .parse (result); if(params!=null){ //处理数据的方法 } else { log.info("取出的数据为空!"); Thread.sleep(Constant.SLEEP_SECONDS); } } catch (Exception e) { e.printStackTrace(); log.error("fail to sleep"+message); break; } } }}我们将消费者的产生方法,在类中的构造函数中定义,使用一个固定大小的线程池,来管理消费者(线程),同时加上Component注解,在项目启动时,Spring 的就会实例化这个类,注入到容器中,这个时候构造方法中的,多个消费者就会启动开始工作。
拓展
- 阿里云官方MessageReceiver的解析 :长连接轮询,以及死锁和线程安全性问题的避免
- 消费者的监控 :观察消费者的数量,避免消费者全部死亡,造成队列积压;
阿里云官方MessageReceiver的解析
在阿里云MNS消费者的使用中,阿里云提供了使用 消息服务-最佳实践-长轮询 ,官方已经提供了源代码和详细说明,我在这里就不贴代码了,主要说明其中的原理。
在MessageReceiver中,官方定义了一个
static final Map<String, > sLockObjMap从而保证了,无论new出多少个MessageReceiver,都是从同一个Map,取出的lockObj。在使用lockObj中,均使用同步锁synchronized,从而实现了LongPolling的排他机制,只有一个线程在做LongPolling,其他线程都会Wait。避免了上百个线程同时访问MNS Server,一个Group只会产生,一条长连接进行长轮询。
可以将图中的Group,比作一台台服务器,而里面的多个Consumer,实际就是启动的多个消费线程。
消费者的监控
在上面代码中,使用了一个固定大小的线程池来管理多个线程(消费者),但是一旦子线程死亡,这个线程(消费者),并不会重启,这种情况就会产生队列积压。产生线程死亡一定是不正常,程序中的Bug存在。比如,有异常没有捕获到,或者在子线程中将异常throw出,就会使当前子线程死亡掉。这种情况一定是会有的,因为没有人写出的代码是完美无缺的,程序员只能尽可能避免bug的产生,所以我们需要用完善的日志和监控来完善,我们的项目。
这里我们可以利用监控线程池中的存活线程数量从而来,进行报警。
//当消费者低于一定阈值触发报警 if(threadPool.getActiveCount()<threshold){ //报警 }可以将这个封装成一个API接口,通过监控这个API来进行报警。
继续阅读与本文标签相同的文章
起底养老机器人产业:有人出货猛增 有人项目叫停
更多关于渐进式图片加载的实现
-
大前端时代前端监控的最佳实践
2026-06-02栏目: 教程
-
阿里巴巴 Java 编码规范最佳实践
2026-06-02栏目: 教程
-
前端你应该了解的数据结构与算法
2026-06-02栏目: 教程
-
真的,移动端尺寸自适应与dpr无关
2026-06-02栏目: 教程
-
【招聘启事】阿里巴巴招前端开发啦!周四晚7点前端大牛直播,还有内推机会!
2026-06-02栏目: 教程
