1. 添加maven依赖
<dependency>
          <groupId>org.spring work.cloud</groupId>
          <artifactId>spring-cloud-stream</artifactId>
      </dependency>
      <dependency>
          <groupId>org.spring work.cloud</groupId>
          <artifactId>spring-cloud-starter-stream-kafka</artifactId>
      </dependency>
  1. 生成者
    (1)properties 配置
spring:
  cloud:
    stream:
      bindings:
        outboundOrg: #默认为output,可自定义
            destination: orgChangeTopic
            content-type: application/json
      kafka:
        binder:
          zk-nodes: localhost
          brokers: localhost

(2) Application增加注解@EnableBinding(Source.class)

(3)定义输出通道

import org.spring work.cloud.stream.annotation.Output;
import org.spring work.messaging.MessageChannel;

public interface SourceChannels {
	@Output(\"outboundOrg\")
	MessageChannel outboundOrg();
}

(4) 生产者代码

import com.thoughtmechanix.organization.events.models.Organizati Model;
import com.thoughtmechanix.organization.utils.UserContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.cloud.stream.messaging.Source;
import org.spring work.messaging.support.MessageBuilder;
import org.spring work.stereotype.Component;

@Component
public class SimpleSourceBean {
	@Autowired
	private Source source;

	private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

	public void publishOrgChannel(String action,String orgId){
		logger.debug(\"Sending Kafka message {} for Organization Id: {}\", action, orgId);
		Organizati Model change = new Organizati Model(Organizati Model.class.getTypeName(),action,orgId, UserContext.getCorrelationId());
		source.output().send(MessageBuilder.withPayload(change).build());
	}
}
import com.thoughtmechanix.organization.events.source.SimpleSourceBean;
import com.thoughtmechanix.organization.model.Organization;
import com.thoughtmechanix.organization.repository.OrganizationRepository;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.stereotype.Service;

import java.util.UUID;

@Service
public class OrganizationService {
    @Autowired
    private OrganizationRepository orgRepository;
    @Autowired
    SimpleSourceBean simpleSourceBean;

    public Organization getOrg(String organizationId) {
        return orgRepository.findById(organizationId);
    }

    public void saveOrg(Organization org){
        org.setId( UUID.randomUUID().toString());

        orgRepository.save(org);
        simpleSourceBean.publishOrgChannel(\"SAVE\",org.getId());
    }

    public void updateOrg(Organization org){
        orgRepository.save(org);
        simpleSourceBean.publishOrgChannel(\"UPDATE\",org.getId());
    }

    public void deleteOrg(Organization org){
        orgRepository.delete( org.getId());
        simpleSourceBean.publishOrgChannel(\"DELETE\",org.getId());
    }
}
  1. 消费者
    (1)properties 配置
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost
          zk-nodes: localhost
      bindings:
        inboundOrgChanges:   #默认为input
          destination: orgChangeTopic
          content-type: application/json
          group: licensingGroup #消费者组保证消息只被一组服务实例处理一次

(2) Application增加注解@EnableBinding(Sink.class)

(3)定义输入通道

import org.spring work.cloud.stream.annotation.Input;
import org.spring work.messaging.SubscribableChannel;

public interface CustomChannels {
	@Input(\"inboundOrgChanges\")
	SubscribableChannel orgs();
}

(4)消费者代码

import com.thoughtmechanix.licenses.events.CustomChannels;
import com.thoughtmechanix.licenses.events.models.Organizati Model;
import com.thoughtmechanix.licenses.repository.OrganizationRedisRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spring work.beans.factory.annotation.Autowired;
import org.spring work.cloud.stream.annotation.EnableBinding;
import org.spring work.cloud.stream.annotation.StreamListener;

@EnableBinding(CustomChannels.class)
public class Organizati Handler {
	private static final Logger logger = LoggerFactory.getLogger(Organizati Handler.class);
	@Autowired
	private OrganizationRedisRepository organizationRedisRepository;

	@StreamListener(\"inboundOrgChanges\")
	public void loggerSink(Organizati Model orgChange){
		logger.debug(\"Received a message of type \" + orgChange.getType());
		switch (orgChange.getAction()){
			case \"GET\":
				logger.debug(\"Received a GET event from the organization service for organization id {}\", orgChange.getOrganizationId());
				break;
			case \"SAVE\":
				logger.debug(\"Received a SAVE event from the organization service for organization id {}\", orgChange.getOrganizationId());
				break;
			case \"UPDATE\":
				logger.debug(\"Received a UPDATE event from the organization service for organization id {}\", orgChange.getOrganizationId());
				organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
				break;
			case \"DELETE\":
				logger.debug(\"Received a DELETE event from the organization service for organization id {}\", orgChange.getOrganizationId());
				organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
				break;
			default:
				logger.error(\"Received an UNKNOWN event from the organization service of action {}\", orgChange.getAction());
				break;
		}
	}
}
收藏 打印