1.在pom. 文件上添加依赖
<parent>
<groupId>org.spring work.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.spring work.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.spring work.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spring work.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
2.配置文件application.properties
# kafka spring.kafka.bootstrap-servers=192.168.3.146:8092 spring.kafka.consumer.group-id=springboot-group1 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer logging.level.root=info
3.添加消费类和发送消息类
@Component
public class KafkaReceiver {
@KafkaListener(topics = {\"wftest\"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
message = kafkaMessage.get();
System.out.println(\"record =\" + record);
System.out.println(\"message =\" + message);
}
}
}
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
kafkaTemplate.send(\"wftest\", gson.toJson(message));
}
}
public class Message {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
4.测试
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
KafkaSender sender = context.getBean(KafkaSender.class);
for (int i = 0; i < 3; i++) {
sender.send();
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
继续阅读与本文标签相同的文章
上一篇 :
【设计模式】责任链模式
-
Hitachi Vantara推出AI驱动的数据中心运营解决方案
2026-05-18栏目: 教程
-
湖北发力“5G”:天时地利人和
2026-05-18栏目: 教程
-
云原生技术之Docker入门
2026-05-18栏目: 教程
-
点我达与满电未来达成合作协议
2026-05-18栏目: 教程
-
光明日报:聚集高端产业 加速双向开放
2026-05-18栏目: 教程
