1、kafka在消息传递的使用非常普遍,相对于activemq来说kafka的分布式管理和使用更加灵活。

2、activemq的搭建和使用可以参考:

  activemq搭建和springmvc的整合:http://www.cnblogs.com/ll409546297/p/6898155.html

  springboot和springboot的整合:http://www.cnblogs.com/ll409546297/p/7805072.html

3、kafka的搭建:

  http://www.cnblogs.com/ll409546297/p/7810302.html

4、下面介绍kafka和springboot的整合

  1)目录结构

  

  2)需要的基础包:pom.

<?  version="1.0" encoding="UTF-8"?><project  ns="http://maven.apache.org/POM/4.0.0"          ns:xsi="http://www.w3.org/2001/ Schema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.troy</groupId>    <artifactId>springbootkafka</artifactId>    <version>1.0-SNAPSHOT</version>    <parent>        <groupId>org.spring work.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>1.5.8.RELEASE</version>    </parent>    <dependencies>        <dependency>            <groupId>org.spring work.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>            <version>1.5.8.RELEASE</version>        </dependency>        <dependency>            <groupId>org.spring work.kafka</groupId>            <artifactId>spring-kafka</artifactId>            <version>1.3.0.RELEASE</version>        </dependency>    </dependencies></project>

  3)基本配置:application.yml

server:  port: 8090spring:  kafka:    bootstrap-servers: 192.168.5.10:9092 #kafka的访问地址,多个用","隔开    consumer:      enable-auto-commit: true      group-id: kafka #群组ID      auto-offset-reset: earliest #启东时接收没有接收到的数据

  如果存在集群的话,配置如下

server:  port: 8090spring:  kafka:    consumer:      enable-auto-commit: true      group-id: kafka      auto-offset-reset: earliest      bootstrap-servers: 192.168.5.11:9092    producer:      bootstrap-servers: 192.168.5.10:9092

   4)生产者:KafkaProducer.class

@Component //这个必须加入容器不然,不会执行@EnableScheduling //这里是为了测试加入定时调度public class KafkaProducer {    @Autowired    private KafkaTemplate kafkaTemplate;    @Scheduled(cron = "00/30 * * * * ?")    public void send(){        System.out.println("send data");        kafkaTemplate.send("topic","kafka data");        //发送方式很多种可以自己研究一下    }}

  5)消费者:KafkaCustomer.class

@Component //同样这里是必须的public class KafkaCustomer {    @KafkaListener(topics = {"topic"})    public void receive(String message){        System.out.println("topic========topic");        System.out.println(message);    }}

  6)测试结果:

 

收藏 打印