SpringCloud学习之SpringCloudStream&集成kafka

小编 2026-06-04 阅读:860 评论:0
一、关于Spring-Cloud-Stream  Spring Cloud Stream本质上就...

一、关于Spring-Cloud-Stream

  Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

  在这里我先放一张官网的图:

SpringCloud学习之SpringCloudStream&集成kafka

  应用程序通过Spring Cloud Stream注入到输入和输出通道与外界进行通信。根据此规则我们很容易的实现消息传递,订阅消息与消息中转。并且当需要切换消息中间件时,几乎不需要修改代码,只需要变更配置就行了。

  在用例图中 Inputs代表了应用程序监听消息 、outputs代表发送消息、binder的话大家可以理解为将应用程序与消息中间件隔离的抽象,类似于三层架构下利用dao屏蔽service与数据库的实现的原理。

  springcloud默认提供了rabbitmq与kafka的实现。

 

二、springcloud集成kafka

1、添加gradle依赖:

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
dependencies{    compile('org.springframework.cloud:spring-cloud-stream')    compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')    compile('org.springframework.kafka:spring-kafka')}
View Code

2、定义一个接口:

  spring-cloud-stream已经给我们定义了最基本的输入与输出接口,他们分别是 Source,Sink, Processor

  Sink接口:

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface Sink {    String INPUT = "input";    @Input("input")    SubscribableChannel input();}
View Code

  Source接口:

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface Source {    String OUTPUT = "output";    @Output("output")    MessageChannel output();}
View Code

  Processor接口:

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
package org.springframework.cloud.stream.messaging;public interface Processor extends Source, Sink {}
View Code

  这里面Processor这个接口既定义输入通道又定义了输出通道。同时我们也可以自己定义通道接口,代码如下:

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
package com.bdqn.lyrk.shop.channel;import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;public interface ShopChannel {    /**     * 发消息的通道名称     */    String SHOP_OUTPUT = "shop_output";    /**     * 消息的订阅通道名称     */    String SHOP_INPUT = "shop_input";    /**     * 发消息的通道     *     * @return     */    @Output(SHOP_OUTPUT)    MessageChannel sendShopMessage();    /**     * 收消息的通道     *     * @return     */    @Input(SHOP_INPUT)    SubscribableChannel recieveShopMessage();}
View Code

 

3、定义服务类

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
package com.bdqn.lyrk.shop.server;import com.bdqn.lyrk.shop.channel.ShopChannel;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestControllerpublic class ShopService {    @Resource(name = ShopChannel.SHOP_OUTPUT)    private MessageChannel sendShopMessageChannel;    @GetMapping("/sendMsg")    public String sendShopMessage(String content) {        boolean isSendSuccess = sendShopMessageChannel.                send(MessageBuilder.withPayload(content).build());        return isSendSuccess ? "发送成功" : "发送失败";    }    @StreamListener(ShopChannel.SHOP_INPUT)    public void receive(Message<String> message) {        System.out.println(message.getPayload());    }}
View Code

  这里面大家注意 @StreamListener。这个注解可以监听输入通道里的消息内容,注解里面的属性指定我们刚才定义的输入通道名称,而MessageChannel则可以通过

输出通道发送消息。使用@Resource注入时需要指定我们刚才定义的输出通道名称

 

4、定义启动类

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
package com.bdqn.lyrk.shop;import com.bdqn.lyrk.shop.channel.ShopChannel;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding;@SpringBootApplication@EnableBinding(ShopChannel.class)public class ShopServerApplication {    public static void main(String[] args) {        SpringApplication.run(ShopServerApplication.class, args);    }}
View Code

  注意@EnableBinding注解,这个注解指定刚才我们定义消息通道的接口名称,当然这里也可以传多个相关的接口

5、定义application.yml文件

SpringCloud学习之SpringCloudStream&集成kafkaSpringCloud学习之SpringCloudStream&集成kafka
spring:  application:    name: shop-server  cloud:    stream:      bindings:        #配置自己定义的通道与哪个中间件交互        shop_input: #ShopChannel里Input和Output的值          destination: zhibo #目标主题        shop_output:          destination: zhibo      default-binder: kafka #默认的binder是kafka  kafka:    bootstrap-servers: localhost:9092 #kafka服务地址    consumer:      group-id: consumer1    producer:      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer      client-id: producer1server:  port: 8100
View Code

  这里是重头戏,我们必须指定所有通道对应的消息主题,同时指定默认的binder为kafka,紧接着定义Spring-kafka的外部化配置,在这里指定producer的序列化类为ByteArraySerializer

 

启动程序成功后,我们访问 http://localhost:8100/sendMsg?content=2 即可得到如下结果SpringCloud学习之SpringCloudStream&集成kafka

 

版权声明

本文仅代表作者观点,不代表百度立场。
本文系作者授权百度百家发表,未经许可,不得转载。

热门文章
  • 机房智能化温湿度解决方式之POE供电以太网温湿度传感器

    机房智能化温湿度解决方式之POE供电以太网温湿度传感器
    机房智能化温湿度解决方式之POE供电以太网温湿度传感器 北京盈创力和电子科技有限公司 智能型TCP网口温湿度记录仪 北京IP网络温湿度记录仪厂家,北京盈创力和 北京智能型TCP网口温湿度记录仪IP网络温湿度记录仪是一种新型的基于TCP/IP协议双绞线以太网标准温湿度采集模块,利用它可以实现现场温度值、相对湿度值的采集,同时利用其自身的RJ45通信接口可以方便地和机房监控主机或交换机集线器进行联网。 工作于-40℃~85℃工业级带...
  • Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering

    Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering
    Problem Statement 我们考虑一个具有马尔可夫性质、非线性、非高斯的状态空间模型(State Space Model):对于一个时间序列上的观测结果{yt,t∈N}\\{ y_t , t \\in N \\}{yt​,t∈N},我们认为每个观测结果yty_tyt​的生成依赖于一个无法直接观察的隐变量xt∈{xt,t∈N}x_t \\in \\{x_t , t \\in N \\}xt​∈{xt​,t∈N},即:p(...
  • HTTP状态保持的原理

    HTTP状态保持的原理
    a)在用户登录之后,浏览器返回响应的时候会在响应中添加上cookieb)浏览器接收到cookie之后会自动保存c)当用户再次请求同一服务器中的其他网页的时候,浏览器会自动带上之前保存的cookied)服务接收到请求之后可以请 request 对象中取到cookie 判断当前用户是否登录  Http是无状态的,就是连接时数据互通,关闭后...
  • Hive 系统函数及示例

    Hive 系统函数及示例
    查看所有系统函数 show functions; 函数分类 内置函数【系统函数】 数学函数: floor、round、ceil、cos、log2等 字符串函数: length、reverse、trim、lower、get_json_object、repeat等 收集函数: size 转换函数: cast 日期函数: year、month、datediff、date、date_add等 条件函数: coalesce、case…w...
  • CSRF的原理和防范措施

    CSRF的原理和防范措施
    a)攻击原理:i.用户C访问正常网站A时进行登录,浏览器保存A的cookieii.用户C再访问攻击网站B,网站B上有某个隐藏的链接或者图片标签会自动请求网站A的URL地址,例如表单提交,传指定的参数iii.而攻击网站B在访问网站A的时候,浏览器会自动带上网站A的cookieiv.所以网站A在接收到请求之后可判断当前用户是登录状态,所以...
标签列表