- 添加maven依赖
<dependency>
<groupId>org.spring work.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.spring work.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
- 生成者
(1)properties 配置
spring:
cloud:
stream:
bindings:
outboundOrg: #默认为output,可自定义
destination: orgChangeTopic
content-type: application/json
kafka:
binder:
zk-nodes: localhost
brokers: localhost
(2) Application增加注解@EnableBinding(Source.class)
(3)定义输出通道
import org.spring work.cloud.stream.annotation.Output;
import org.spring work.messaging.MessageChannel;
public interface SourceChannels {
@Output(\"outboundOrg\")
MessageChannel outboundOrg();
}
(4) 生产者代码
import com.thoughtmechanix.organization.events.models.Organizati Model;
import com.thoughtmechanix.organization.utils.UserContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.cloud.stream.messaging.Source;
import org.spring work.messaging.support.MessageBuilder;
import org.spring work.stereotype.Component;
@Component
public class SimpleSourceBean {
@Autowired
private Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
public void publishOrgChannel(String action,String orgId){
logger.debug(\"Sending Kafka message {} for Organization Id: {}\", action, orgId);
Organizati Model change = new Organizati Model(Organizati Model.class.getTypeName(),action,orgId, UserContext.getCorrelationId());
source.output().send(MessageBuilder.withPayload(change).build());
}
}
import com.thoughtmechanix.organization.events.source.SimpleSourceBean;
import com.thoughtmechanix.organization.model.Organization;
import com.thoughtmechanix.organization.repository.OrganizationRepository;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.stereotype.Service;
import java.util.UUID;
@Service
public class OrganizationService {
@Autowired
private OrganizationRepository orgRepository;
@Autowired
SimpleSourceBean simpleSourceBean;
public Organization getOrg(String organizationId) {
return orgRepository.findById(organizationId);
}
public void saveOrg(Organization org){
org.setId( UUID.randomUUID().toString());
orgRepository.save(org);
simpleSourceBean.publishOrgChannel(\"SAVE\",org.getId());
}
public void updateOrg(Organization org){
orgRepository.save(org);
simpleSourceBean.publishOrgChannel(\"UPDATE\",org.getId());
}
public void deleteOrg(Organization org){
orgRepository.delete( org.getId());
simpleSourceBean.publishOrgChannel(\"DELETE\",org.getId());
}
}
- 消费者
(1)properties 配置
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
zk-nodes: localhost
bindings:
inboundOrgChanges: #默认为input
destination: orgChangeTopic
content-type: application/json
group: licensingGroup #消费者组保证消息只被一组服务实例处理一次
(2) Application增加注解@EnableBinding(Sink.class)
(3)定义输入通道
import org.spring work.cloud.stream.annotation.Input;
import org.spring work.messaging.SubscribableChannel;
public interface CustomChannels {
@Input(\"inboundOrgChanges\")
SubscribableChannel orgs();
}
(4)消费者代码
import com.thoughtmechanix.licenses.events.CustomChannels;
import com.thoughtmechanix.licenses.events.models.Organizati Model;
import com.thoughtmechanix.licenses.repository.OrganizationRedisRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.cloud.stream.annotation.EnableBinding;
import org.spring work.cloud.stream.annotation.StreamListener;
@EnableBinding(CustomChannels.class)
public class Organizati Handler {
private static final Logger logger = LoggerFactory.getLogger(Organizati Handler.class);
@Autowired
private OrganizationRedisRepository organizationRedisRepository;
@StreamListener(\"inboundOrgChanges\")
public void loggerSink(Organizati Model orgChange){
logger.debug(\"Received a message of type \" + orgChange.getType());
switch (orgChange.getAction()){
case \"GET\":
logger.debug(\"Received a GET event from the organization service for organization id {}\", orgChange.getOrganizationId());
break;
case \"SAVE\":
logger.debug(\"Received a SAVE event from the organization service for organization id {}\", orgChange.getOrganizationId());
break;
case \"UPDATE\":
logger.debug(\"Received a UPDATE event from the organization service for organization id {}\", orgChange.getOrganizationId());
organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
break;
case \"DELETE\":
logger.debug(\"Received a DELETE event from the organization service for organization id {}\", orgChange.getOrganizationId());
organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
break;
default:
logger.error(\"Received an UNKNOWN event from the organization service of action {}\", orgChange.getAction());
break;
}
}
}
继续阅读与本文标签相同的文章
下一篇 :
115、下一个更大元素
-
Docker 用ansible给主机安装docker
2026-05-18栏目: 教程
-
Docker centos7 with sshd (sshd centos7 常用镜像)
2026-05-18栏目: 教程
-
Docker 做资源限制
2026-05-18栏目: 教程
-
JavaScript 基础类型,数据类型
2026-05-18栏目: 教程
-
[MySQL] docker下安装使用mysql配置主从复制
2026-05-18栏目: 教程
