之前我想要把数据结构写完,再来写这些乱七八糟的东西。不过最近一段时间工作比较忙,加上未来涉及到中间件的使用,回头还是先把中间件的理解先放在前面吧,数据结构这块不会烂尾的。
kafka 是什么
官网上指出”A Distributed Streaming Platform”,一个分布式的流处理平台
- PUBLISH & SUBSCRIBE 发布和订阅
- PROCESS 处理
- STORE 贮存
流处理具体是什么含义相信大家都理解,毕竟一个java工程师对Linux和C一定是很熟悉的。
那么kafka三大功能都是什么含义呢?
PUBLISH & SUBSCRIBE
e.g.
我们开车通常会听广播,那广播是从广播电台统一发布
那我们接受广播,只需要打开收音机,订阅广播即可听到这个声音
那么这个关系,就是发布和订阅的关系,一端发出,一端订阅
对应到kafka中的概念,就是
- Producer 生产发布
- Consumer 消费订阅
PROCESS
编写可扩展的流处理应用程序,用于实时事件响应的场景。一开始我们看到kafka定位为“一个分布式的流处理平台”
既然是流处理,那处理必然是一个很关键的一点
e.g. 一个秒杀活动,并发量特别大,并发量大的时候,可能会有大量用户同时提交付款,但我们的服务器可能无法承载这么多并发,那我们采用延时订阅等方式,以达到可以处理历史数据的应用程序
STORE
安全的将流式的数据存储在一个分布式,有副本备份,容错的集群。
这个概念比较简单了,意思是这个数据是有备份的,你有几个集群,这个数据就会被备份几份,比如说你有总容量600G的3台主机建立了3个kafka集群,那么实际有效存储只有200G,这是因为每个数据都会被同时备份到另外两台主机中,以保证数据不会丢失,利用空间换取稳定。同时这个集群是一个容错的集群,后期我会做这个容错测试,让自己也能更放心大胆的用kafka。
在这里延伸两个名词
- Topic Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).
- Broker 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
那么以上就是kafka的官网概念回顾。总体来讲只要接触过集群,分布式,微服务的概念,理解这些应该是非常非常简单的
总之kafka有四个小概念,Producer,Consumer,Topic,Broker
那么根据这四个概念,我们简单建立一个消息系统模型
工厂A 每分钟生产馒头5个 —为工厂A建立 Topic FactoryA
工厂B 每分钟生产窝头1个 —为工厂B建立 Topic FactoryB
那么两个工厂每分钟都会发布一个消息到各自的主题(Topic)中
为了保证工厂发布渠道的稳定性,那两个工厂合资创建了3个销售渠道Broker0,Broker1,Broker2
因为工厂每分钟总共就生产出固定的馒头,不会因为某个销售渠道卖的多而增加产量,所有这三个渠道之间的库存数据(待消费数据)是同步的
此时客户来了,客户可能只想吃馒头,那他只订阅FactoryA,那么FactoryB的数据就不会给这个客户,而客户也并不关心是哪个销售渠道提供的,他只关心最后送来的东西是工厂A生产的馒头,就OK了。
如果客户比较能吃(服务器承载量大),那他可以订阅两个Topic,一起处理
这种就是我们的消息系统的实际应用场景
ps: 官网上提供了很完整的下载,启动,测试的方法,这个笔记中不做记录,想练回头看下官网的Quick Start即可,同时安利一个中文版的国内网站,翻译的有些偏差,不过也非常棒,减少了我的一些阅读障碍
kafka重要配置项
############################# Server Basics ############################# # 核心配置 broker.id=0 ############################# Socket Server Settings ############################# # 监听端口 listeners=PLAINTEXT://:9092 # 配置服务提供远端访问能力 advertised.listeners=PLAINTEXT://47.96.29.8:9092 # 配置Https的连接 没用到 比较复杂 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # server用来处理网络请求的网络线程数目;一般你不需要更改这个属性。 num.network.threads=3 # server用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。 num.io.threads=8 # SO_SNDBUFF 缓存大小,server进行socket 连接所用 socket.send.buffer.bytes=102400 # SO_RCVBUFF缓存大小,server进行socket连接时所用 socket.receive.buffer.bytes=102400 # server允许的最大请求尺寸; 这将避免server溢出,它应该小于Java heap size socket.request.max.bytes=10485760 ############################# Log Basics ############################# # 此目录每次重启会被清理,测试用就不改了 log.dirs=/tmp/kafka-logs # 如果创建topic时没有给出划分partitions个数,这个数字将是topic下partitions数目的默认数值。 num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=0 |
Kafka启动
- 首先下载kafka
wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz - 解压
tar -xzf kafka_2.11-2.1.0.tgz - 进入kafka目录,开始配置(最好复制一份比如server-1.properties)
vim ~/{kafka}/config/server.properties - 启动zookeeper,不建议使用kafka自带的zookeeper,我在另一台机器起了一台
需要先拷贝一份默认配置并修改配置,然后重命名为zoo.cfgnohup ./zkServer.sh > /root/log/zookeeper-1.log 2>&1 & - 启动kafka
bin/kafka-server-start.sh config/server.properties - 启动监听
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxxx --from-beginning
至此,kafka启动完毕,第六部可以不使用,但调试非常有帮助,可以看到你的信息是否成功发送到服务器上
使用kafka java API
说了一大堆,没实战就是耍流氓,现在起开始正式进入编码阶段。
首先,Jar包是肯定的了,此时我会选用最新版本的jar包,毕竟是测试联系,不会太计较稳定性,也想通过新版本的一些坑对比老版本。
1 2 3 4 5 |
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.0.0</version>
</dependency>
|
仔细看了官网的文档,发现生产和消费,貌似用的同一个Jar包?先试试看吧,回头有问题再改。
剩下的我们来看下kafka的代码,首先是一个生产者的类 –请忽略我非常不要脸的把自己名放到类名里了 实在没啥好名字 也为了防止冲突✌️
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
/**
* @author eddie
* @createTime 2018-11-08
* @de ion 生产者
*/
public class EddieProducer extends ShutdownableThread {
private Queue<Message> queue;
private KafkaProducer<String, String> producer;
private String topic;
private boolean shutdown = false;
public EddieProducer(final String topic, final KafkaProducer<String, String> producer){
super(\"\", false);
queue = new Concurrent edQueue<>();
this.topic = topic;
this.producer = producer;
}
public boolean add(Message e){
return queue.add(e);
}
public boolean add(Collection<? extends Message> collection){
return queue.addAll(collection);
}
@Override
public void doWork() {
while (!shutdown && !Thread.currentThread().isInterrupted()) {
if (!queue.isEmpty()) {
long startTime = System.currentTimeMillis();
Message message = s.requireNonNull(queue.poll());
String key = message.getKey();
String value = message.getValue();
Future<Record data> send = producer.send(
new ProducerRecord<>(topic, key, value),
new ProducerCallBack(startTime, key, value)
);
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void cancel() {
shutdown = true;
Thread.currentThread().interrupt();
}
}
|
可以看到上面我们用了一个回调ProducerCallBack,这个东西其实是自己定义的,根据配置,目前是当kafka服务器成功写入一个主机(broker)中就返回,返回后回调这个类的onCompletion方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
/**
* @author eddie
* @createTime 2018-11-08
* @de ion 回调函数
*/
public class ProducerCallBack implements Callback {
private final long startTime;
private final String key;
private final String message;
public ProducerCallBack(long startTime, String key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
@Override
public void onCompletion(Record data data, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if ( data != null) {
System.out.printf(\"message(%s, %s) sent to partition(%d), offset(%d) in %d ms%n\",
key, message, data.partition(), data.offset(), elapsedTime);
} else {
exception.printStackTrace();
}
}
}
|
再来看下消费者类,消费者稍微有点复杂,这里面有个偏移量的概念,这个概念我理解是可以随意访问历史量的重要参数,我这一些配置放到了另外一个配置工厂里了,不过因为有些问题我也不是很清楚,所以慢慢成长,这个小工具完整的实现在github上,文末有链接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
/**
* @author eddie
* @createTime 2018-11-08
* @de ion
*/
public class EddieConsumer extends ShutdownableThread {
private String topic;
private KafkaConsumer consumer;
public EddieConsumer(final String topic, final KafkaConsumer<String, String> producer){
super(\"KafkaConsumerExample\", false);
this.topic = topic;
this.consumer = producer;
}
@Override
public void doWork() {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
int count = records.count();
System.out.println( \"测试结果\" + count);
for (ConsumerRecord<String, String> record : records) {
System.out.println(\"Received message: (\" + record.key() + \", \" + record.value() + \") at offset \" + record.offset());
}
}
@Override
public String name() {
return \"KafkaConsumerExample\";
}
@Override
public boolean isInterruptible() {
return Thread.currentThread().isInterrupted();
}
}
|
通过以上的代码,我们实现了发布数据到远端,实时拿到数据。
最后看下调用的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
/**
* @author eddie
* @createTime 2018-11-08
* @de ion 测试类
*/
public class MainTest {
public static void main(String[] args){
//生成地址池
List<KafkaUrlNode> list = new ArrayList<>();
list.add(new KafkaUrlNode(\"47.96.29.8\", \"9092\"));
//构造一个配置对象
KafkaProducerConfig config = new KafkaProducerConfig();
//里面配置很多 但我写了很多默认配置 除了这个地址池,其他选默认没有关系 可以满足大部分需求了
config.setUrlNodeList(list);
//我比较迷恋的工厂模式
KafkaProducer producer = KafkaFactory.create()
.init(config)
.setSerializerKeyAndValueType(TypeDefine.STRING, TypeDefine.STRING)
.buildKafkaProducer();
//通过工厂模式生产出来的,就是非常标准的KafkaProducer 但这个使用起来并不那么顺手
//所以我又封装了一个EddieProducer 开线程来处理生产和消费
EddieProducer eddieProducer = new EddieProducer(\"test\", producer);
//开启线程
eddieProducer.start();
//添加数据
eddieProducer.add(new Message(\"a\", \"b\"));
eddieProducer.add(new Message(\"a\", \"b\"));
eddieProducer.add(new Message(\"a\", \"b\"));
eddieProducer.add(new Message(\"a\", \"b\"));
eddieProducer.add(new Message(\"a\", \"b\"));
eddieProducer.add(new Message(\"a\", \"b\"));
//一个关掉线程的小工具 写的非常简易 不过胜在安全
ThreadShutdown.finishThread(eddieProducer);
System.out.println(\"测试\");
KafkaConsumerConfig conmuserConfig = new KafkaConsumerConfig();
conmuserConfig.setUrlNodeList(list);
conmuserConfig.setGroupId(\"test\");
KafkaConsumer consumer = KafkaFactory.create()
.init(conmuserConfig)
.setDeserializerKeyAndValueType(TypeDefine.DE_STRING, TypeDefine.DE_STRING)
.buildKafkaConmuser();
EddieConsumer eddieConsumer = new EddieConsumer(\"test\", consumer);
eddieConsumer.start();
}
}
|
总的来说,我个人觉得kafka学习的主要难度在于只有官网可查,网上很多资料要不太老,要么根本就是个超简单东西摆在那,一点实际价值都没得。所以搞这么一个博客,记录自己的学习笔记,省着忘了。
GitHUb
然后最后感谢 Java高端交流群一 中的 Κёлτ && 那一天,通过与他们讨论,解决了我很多问题,包括线程底层原理的部分。
继续阅读与本文标签相同的文章
钱颖一:教育要训练大脑会思考
语义分割 | 常见语义分割方法资料汇总
-
猎户星空CEO傅盛:现在是AI发展最好时期,家庭服务机器人前景可期
2026-05-14栏目: 教程
-
5G远程驾驶和微公交首秀互联网大会
2026-05-14栏目: 教程
-
学宏程序编程,这些知识必不可少!
2026-05-14栏目: 教程
-
华为准备卖出“落后”的5G,多家美企极力竞争!任正非格局太大!
2026-05-14栏目: 教程
-
百度:飞桨深度学习平台已累计服务150多万开发者
2026-05-14栏目: 教程
