工程结构:

\"\"

 

定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错:

<properties>
    <spring work.version>4.2.7.RELEASE</spring work.version>
    <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version>
    <junit.version>4.12</junit.version>
</properties>

dependencies:

<dependencies>

    <!-- LOGGING begin -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.5</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.0.13</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.0.13</version>
    </dependency>
    <!-- 代码直接调用common-logging会被桥接到slf4j -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>jcl-over-slf4j</artifactId>
        <version>1.7.5</version>
    </dependency>
    <!-- LOGGING end -->

    <!--spring work-->
    <dependency>
        <groupId>org.spring work</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring work.version}</version>
    </dependency>
    <dependency>
        <groupId>org.spring work</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring work.version}</version>
    </dependency>

    <!-- rabbitmq spring依赖 -->
    <dependency>
        <groupId>org.spring work.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>${spring-rabbit.version}</version>
    </dependency>

    <!--common utils-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.3.2</version>
    </dependency>

    <!--test begin-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>${junit.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <dependency>
        <groupId>org.spring work</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring work.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <!--test end-->
</dependencies>

 

spring-applicationContext:

<?  version=\"1.0\" encoding=\"UTF-8\"?>
<beans  ns=\"http://www.spring work.org/schema/beans\"
        ns:xsi=\"http://www.w3.org/2001/ Schema-instance\"
        ns:context=\"http://www.spring work.org/schema/context\"
        ns:rabbit=\"http://www.spring work.org/schema/rabbit\"

       xsi:schemaLocation=\"http://www.spring work.org/schema/beans http://www.spring work.org/schema/beans/spring-beans-3.2.xsd
       http://www.spring work.org/schema/beans http://www.spring work.org/schema/beans/spring-beans-3.2.xsd
       http://www.spring work.org/schema/context http://www.spring work.org/schema/context/spring-context-3.1.xsd 
         http://www.spring work.org/schema/rabbit http://www.spring work.org/schema/rabbit/spring-rabbit.xsd\">

    <bean class=\"org.spring work.beans.factory.config.PropertyPlaceholderConfigurer\">
        <property name=\"fileEncoding\" value=\"UTF-8\"></property>
        <property name=\"locations\">
            <list>
                <value>classpath:applicationContext.properties</value>
            </list>
        </property>
    </bean>

    <context:annotation-config/>

    <bean class=\"org.spring work.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor\"/>
    <!-- 配置扫描路径 -->
    <context:component-scan  -package=\"demo\"></context:component-scan>

    <!--rabbit server参数 -->
    <rabbit:connection-factory id=\"connectionFactory\"
                               username=\"${paycenter.mq.user.username}\"
                               password=\"${paycenter.mq.user.password}\"
                               addresses=\"${paycenter.mq.user.host}\"></rabbit:connection-factory>

    <import resource=\"classpath:mq-applicationContext-producer. \"/>
    <import resource=\"classpath:mq-applicationContext-consumer. \"/>
</beans>

 

mq-applicationContext-producer. :

 

<?  version=\"1.0\" encoding=\"UTF-8\"?>
<beans  ns=\"http://www.spring work.org/schema/beans\"
        ns:xsi=\"http://www.w3.org/2001/ Schema-instance\"  ns:rabbit=\"http://www.spring work.org/schema/rabbit\"
       xsi:schemaLocation=\"http://www.spring work.org/schema/beans
     http://www.spring work.org/schema/beans/spring-beans-4.0.xsd
     http://www.spring work.org/schema/beans
     http://www.spring work.org/schema/beans/spring-beans-4.0.xsd
     http://www.spring work.org/schema/rabbit
     http://www.spring work.org/schema/rabbit/spring-rabbit-1.6.xsd\">

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory=\"connectionFactory\"/>

    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->
    <bean id=\"mqMessageConverter\"
          class=\"org.spring work.amqp.support.converter.SimpleMessageConverter\">
    </bean>

    <!--<bean id=\"publisherConfirmsReturns\" class=\"com.emaxcard.mq.rabbit.PublisherConfirmsReturns\"></bean>-->


    <!--========================延迟队列配置 begin =========================-->
    <rabbit:queue id=\"agentpayqueryQueue2\" durable=\"true\" auto-delete=\"true\" exclusive=\"false\"
                  name=\"agentpayqueryQueue2\"/>
    <rabbit:direct-exchange id=\"agentpayqueryExchange2\" durable=\"true\" auto-delete=\"true\" name=\"agentpayqueryExchange2\">
        <rabbit:bindings>
            <rabbit:binding queue=\"agentpayqueryQueue2\" key=\"delay\"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <rabbit:queue id=\"agentpayqueryQueue1\" durable=\"true\" auto-delete=\"true\" exclusive=\"false\"
                  name=\"agentpayqueryQueue1\">
        <rabbit:queue-arguments>
            <entry key=\"x-dead-letter-exchange\" value=\"agentpayqueryExchange2\"/>
            <entry key=\"x-message-ttl\" value=\"10000\" value-type=\"java.lang.Long\"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:direct-exchange id=\"agentpayqueryExchange1\" durable=\"true\" auto-delete=\"true\" name=\"agentpayqueryExchange1\">
        <rabbit:bindings>
            <rabbit:binding queue=\"agentpayqueryQueue1\" key=\"delay\"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义RabbitTemplate实例-->
    <!--confirm-callback=\"publisherConfirmsReturns\" return-callback=\"publisherConfirmsReturns\"-->
    <rabbit:template id=\"agentpayQueryMsgTemplate\"
                     exchange=\"agentpayqueryExchange1\" routing-key=\"delay\"
                     connection-factory=\"connectionFactory\" message-converter=\"mqMessageConverter\"
                     mandatory=\"true\"
    />
    <!--========================延迟队列配置 end =========================-->

</beans>

 

mq-applicationContext-consumer. :

<?  version=\"1.0\" encoding=\"UTF-8\"?>
<beans  ns=\"http://www.spring work.org/schema/beans\"
        ns:xsi=\"http://www.w3.org/2001/ Schema-instance\"  ns:rabbit=\"http://www.spring work.org/schema/rabbit\"
       xsi:schemaLocation=\"http://www.spring work.org/schema/beans
     http://www.spring work.org/schema/beans/spring-beans-4.0.xsd
     http://www.spring work.org/schema/beans
     http://www.spring work.org/schema/beans/spring-beans-4.0.xsd
     http://www.spring work.org/schema/rabbit
     http://www.spring work.org/schema/rabbit/spring-rabbit-1.6.xsd\">


    <bean id=\"agentpayQueryConsumer\" class=\"demo.TestMQConsumer\" />

    <!-- TODO 后续删除
    receive-timeout:等待接收超时时长 影响连接创建和销毁

    concurrency:消费者个数
    max-concurrency:最大消费者个数
    min-start-interval:陆续启动  减少并发环境(或是三方系统突然的网络延迟) 大量连接导致的性能耗损
    min-stop-interval:陆续销毁   减少突然的安静 导致大量可用连接被销毁
    min-consecutive-active: 连续N次没有接收发生超时  则认定为需要创建 消费者
    min-consecutive-idle: 连续N次发生了接收超时   则认定消费者需要销毁

    prefetch:每个消费者预读条数 因为异步调用三方 性能瓶颈在网络与三方系统所以预读取条数设置为1(默认为5) 只有一条消息被ACK才会接收下一条消息
    transaction-size:会影响prefetch的数量
    -->
    <!--  监听器 -->
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory=\"connectionFactory\" acknowledge=\"auto\"
                               max-concurrency=\"20\"
                               concurrency=\"5\"
                               prefetch=\"10\">
        <rabbit:listener ref=\"agentpayQueryConsumer\" queue-names=\"agentpayqueryQueue2\" />
    </rabbit:listener-container>
</beans>

 

 

Producer类:
package demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spring work.amqp.rabbit.core.RabbitTemplate;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.test.context.ContextConfiguration;
import org.spring work.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = \"classpath:applicationContext. \")
public class TestMQProducer {

    private static Logger logger = LoggerFactory.getLogger(TestMQProducer.class.getSimpleName());

    @Autowired
    private RabbitTemplate agentpayQueryMsgTemplate;

    @Test
    public void test() throws Exception {
        for (int i = 0; i <= 100; i++) {
              data = String.valueOf(i);
            agentpayQueryMsgTemplate.convertAndSend(data);
            logger.info(\"入队:{}\", data);
        }
        Thread.sleep(12000);
    }
}

 

 

Consumer类:
package demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spring work.amqp.core.Message;
import org.spring work.amqp.core.MessageListener;

public class TestMQConsumer implements MessageListener {

    private static Logger logger = LoggerFactory.getLogger(TestMQConsumer.class.getSimpleName());

    public void onMessage(Message message) {
        String data = new String(message.getBody());

        try {
            //模拟处理慢
            Thread.sleep(1);

            logger.info(\"出队:{}\", data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

 

 

 至此代码就完毕了。

 

说明:上面定义队列时我把auto-delete属性设置为true, 所以,当消费者消费完并关闭连接后,队列会自动删除。exchange也如是。(通过mq控制台看,栗子中的agentpayqueryQueue2和agentpayqueryExchange2在执行完就自动消失了,agentpayqueryQueue1和agentpayqueryExchange1还存在。)

spring-rabbit-x. 里对queue和exchange的auto-delete属性的解释:

Flag indicating that an queue will be deleted when it is no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:queue)

Flag indicating that an exchange will be deleted when no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:exchange)

 

消费端的concurrency说明:

同样,看spring-rabbit-x. 的解释:

The number of concurrent consumers to start for each listener initially.
See also \'max-concurrency\'.

 

上面我设置的值是5,从mq控制台里看queue的consumer见下图:

\"\"

从出队日志,可以看出来,共有5个线程在消费这些消息。

\"\"

 

 

 \"\"

收藏 打印