之前我想要把数据结构写完,再来写这些乱七八糟的东西。不过最近一段时间工作比较忙,加上未来涉及到中间件的使用,回头还是先把中间件的理解先放在前面吧,数据结构这块不会烂尾的。

kafka 是什么

官网上指出”A Distributed Streaming Platform”,一个分布式的流处理平台

  • PUBLISH & SUBSCRIBE 发布和订阅
  • PROCESS 处理
  • STORE 贮存

流处理具体是什么含义相信大家都理解,毕竟一个java工程师对Linux和C一定是很熟悉的。
那么kafka三大功能都是什么含义呢?

PUBLISH & SUBSCRIBE

e.g.
我们开车通常会听广播,那广播是从广播电台统一发布
那我们接受广播,只需要打开收音机,订阅广播即可听到这个声音
那么这个关系,就是发布和订阅的关系,一端发出,一端订阅

对应到kafka中的概念,就是

  • Producer 生产发布
  • Consumer 消费订阅

PROCESS

编写可扩展的流处理应用程序,用于实时事件响应的场景。一开始我们看到kafka定位为“一个分布式的流处理平台”
既然是流处理,那处理必然是一个很关键的一点
e.g. 一个秒杀活动,并发量特别大,并发量大的时候,可能会有大量用户同时提交付款,但我们的服务器可能无法承载这么多并发,那我们采用延时订阅等方式,以达到可以处理历史数据的应用程序

STORE

安全的将流式的数据存储在一个分布式,有副本备份,容错的集群。
这个概念比较简单了,意思是这个数据是有备份的,你有几个集群,这个数据就会被备份几份,比如说你有总容量600G的3台主机建立了3个kafka集群,那么实际有效存储只有200G,这是因为每个数据都会被同时备份到另外两台主机中,以保证数据不会丢失,利用空间换取稳定。同时这个集群是一个容错的集群,后期我会做这个容错测试,让自己也能更放心大胆的用kafka。
在这里延伸两个名词

  • Topic Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).
  • Broker 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

那么以上就是kafka的官网概念回顾。总体来讲只要接触过集群,分布式,微服务的概念,理解这些应该是非常非常简单的
总之kafka有四个小概念,Producer,Consumer,Topic,Broker
那么根据这四个概念,我们简单建立一个消息系统模型

工厂A 每分钟生产馒头5个 —为工厂A建立 Topic FactoryA
工厂B 每分钟生产窝头1个 —为工厂B建立 Topic FactoryB
那么两个工厂每分钟都会发布一个消息到各自的主题(Topic)中
为了保证工厂发布渠道的稳定性,那两个工厂合资创建了3个销售渠道Broker0,Broker1,Broker2
因为工厂每分钟总共就生产出固定的馒头,不会因为某个销售渠道卖的多而增加产量,所有这三个渠道之间的库存数据(待消费数据)是同步的
此时客户来了,客户可能只想吃馒头,那他只订阅FactoryA,那么FactoryB的数据就不会给这个客户,而客户也并不关心是哪个销售渠道提供的,他只关心最后送来的东西是工厂A生产的馒头,就OK了。
如果客户比较能吃(服务器承载量大),那他可以订阅两个Topic,一起处理

这种就是我们的消息系统的实际应用场景
ps: 官网上提供了很完整的下载,启动,测试的方法,这个笔记中不做记录,想练回头看下官网的Quick Start即可,同时安利一个中文版的国内网站,翻译的有些偏差,不过也非常棒,减少了我的一些阅读障碍

kafka重要配置项

############################# Server Basics #############################
# 核心配置
broker.id=0

############################# Socket Server Settings #############################
# 监听端口
listeners=PLAINTEXT://:9092
# 配置服务提供远端访问能力
advertised.listeners=PLAINTEXT://47.96.29.8:9092
# 配置Https的连接 没用到 比较复杂
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# server用来处理网络请求的网络线程数目;一般你不需要更改这个属性。
num.network.threads=3
# server用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。
num.io.threads=8
# SO_SNDBUFF 缓存大小,server进行socket 连接所用
socket.send.buffer.bytes=102400
# SO_RCVBUFF缓存大小,server进行socket连接时所用
socket.receive.buffer.bytes=102400
# server允许的最大请求尺寸;  这将避免server溢出,它应该小于Java heap size
socket.request.max.bytes=10485760

############################# Log Basics #############################
# 此目录每次重启会被清理,测试用就不改了
log.dirs=/tmp/kafka-logs
# 如果创建topic时没有给出划分partitions个数,这个数字将是topic下partitions数目的默认数值。
num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

Kafka启动

  1. 首先下载kafka
    wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  2. 解压
    tar -xzf kafka_2.11-2.1.0.tgz
  3. 进入kafka目录,开始配置(最好复制一份比如server-1.properties)
    vim ~/{kafka}/config/server.properties
  4. 启动zookeeper,不建议使用kafka自带的zookeeper,我在另一台机器起了一台
    需要先拷贝一份默认配置并修改配置,然后重命名为zoo.cfg
    nohup ./zkServer.sh > /root/log/zookeeper-1.log 2>&1 &
  5. 启动kafka
    bin/kafka-server-start.sh config/server.properties
  6. 启动监听
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxxx --from-beginning

至此,kafka启动完毕,第六部可以不使用,但调试非常有帮助,可以看到你的信息是否成功发送到服务器上

使用kafka java API

说了一大堆,没实战就是耍流氓,现在起开始正式进入编码阶段。
首先,Jar包是肯定的了,此时我会选用最新版本的jar包,毕竟是测试联系,不会太计较稳定性,也想通过新版本的一些坑对比老版本。

1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.0.0</version>
</dependency>

 

仔细看了官网的文档,发现生产和消费,貌似用的同一个Jar包?先试试看吧,回头有问题再改。

剩下的我们来看下kafka的代码,首先是一个生产者的类 –请忽略我非常不要脸的把自己名放到类名里了 实在没啥好名字 也为了防止冲突✌️

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
 * @author eddie
 * @createTime 2018-11-08
 * @de ion 生产者
 */
public class EddieProducer extends ShutdownableThread {

    private Queue<Message> queue;

    private KafkaProducer<String, String> producer;

    private String topic;

    private boolean shutdown = false;

    public EddieProducer(final String topic, final KafkaProducer<String, String> producer){
        super(\"\", false);
        queue = new Concurrent edQueue<>();
        this.topic = topic;
        this.producer = producer;
    }

    public boolean add(Message e){
        return queue.add(e);
    }

    public boolean add(Collection<? extends Message> collection){
        return queue.addAll(collection);
    }

    @Override
    public void doWork() {
        while (!shutdown && !Thread.currentThread().isInterrupted()) {
            if (!queue.isEmpty()) {
                long startTime = System.currentTimeMillis();
                Message message =  s.requireNonNull(queue.poll());
                String key = message.getKey();
                String value = message.getValue();
                Future<Record data> send = producer.send(
                        new ProducerRecord<>(topic, key, value),
                        new ProducerCallBack(startTime, key, value)
                );
            }
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void cancel() {
        shutdown = true;
        Thread.currentThread().interrupt();
    }
}

 

可以看到上面我们用了一个回调ProducerCallBack,这个东西其实是自己定义的,根据配置,目前是当kafka服务器成功写入一个主机(broker)中就返回,返回后回调这个类的onCompletion方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
 * @author eddie
 * @createTime 2018-11-08
 * @de ion 回调函数
 */
public class ProducerCallBack implements Callback {
    private final long startTime;
    private final String key;
    private final String message;

    public ProducerCallBack(long startTime, String key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    @Override
    public void onCompletion(Record data  data, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if ( data != null) {
            System.out.printf(\"message(%s, %s) sent to partition(%d), offset(%d) in %d ms%n\",
                    key, message,  data.partition(),  data.offset(), elapsedTime);
        } else {
            exception.printStackTrace();
        }
    }
}

 

再来看下消费者类,消费者稍微有点复杂,这里面有个偏移量的概念,这个概念我理解是可以随意访问历史量的重要参数,我这一些配置放到了另外一个配置工厂里了,不过因为有些问题我也不是很清楚,所以慢慢成长,这个小工具完整的实现在github上,文末有链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
 * @author eddie
 * @createTime 2018-11-08
 * @de ion
 */
public class EddieConsumer extends ShutdownableThread {

    private String topic;

    private KafkaConsumer consumer;

    public EddieConsumer(final String topic, final KafkaConsumer<String, String> producer){
        super(\"KafkaConsumerExample\", false);
        this.topic = topic;
        this.consumer = producer;
    }

    @Override
    public void doWork() {
        consumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
        int count = records.count();
        System.out.println( \"测试结果\" + count);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(\"Received message: (\" + record.key() + \", \" + record.value() + \") at offset \" + record.offset());
        }
    }

    @Override
    public String name() {
        return \"KafkaConsumerExample\";
    }

    @Override
    public boolean isInterruptible() {
        return Thread.currentThread().isInterrupted();
    }
}

 

通过以上的代码,我们实现了发布数据到远端,实时拿到数据。
最后看下调用的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
 * @author eddie
 * @createTime 2018-11-08
 * @de ion 测试类
 */
public class MainTest {

    public static void main(String[] args){
        //生成地址池
        List<KafkaUrlNode> list = new ArrayList<>();
        list.add(new KafkaUrlNode(\"47.96.29.8\", \"9092\"));
        //构造一个配置对象
        KafkaProducerConfig config = new KafkaProducerConfig();
        //里面配置很多 但我写了很多默认配置 除了这个地址池,其他选默认没有关系 可以满足大部分需求了
        config.setUrlNodeList(list);

        //我比较迷恋的工厂模式
        KafkaProducer producer = KafkaFactory.create()
                .init(config)
                .setSerializerKeyAndValueType(TypeDefine.STRING, TypeDefine.STRING)
                .buildKafkaProducer();

        //通过工厂模式生产出来的,就是非常标准的KafkaProducer 但这个使用起来并不那么顺手
        //所以我又封装了一个EddieProducer 开线程来处理生产和消费
        EddieProducer eddieProducer = new EddieProducer(\"test\", producer);

        //开启线程
        eddieProducer.start();
        //添加数据
        eddieProducer.add(new Message(\"a\", \"b\"));
        eddieProducer.add(new Message(\"a\", \"b\"));
        eddieProducer.add(new Message(\"a\", \"b\"));
        eddieProducer.add(new Message(\"a\", \"b\"));
        eddieProducer.add(new Message(\"a\", \"b\"));
        eddieProducer.add(new Message(\"a\", \"b\"));

        //一个关掉线程的小工具 写的非常简易 不过胜在安全
        ThreadShutdown.finishThread(eddieProducer);
        System.out.println(\"测试\");


        KafkaConsumerConfig conmuserConfig = new KafkaConsumerConfig();
        conmuserConfig.setUrlNodeList(list);
        conmuserConfig.setGroupId(\"test\");
        KafkaConsumer consumer = KafkaFactory.create()
                .init(conmuserConfig)
                .setDeserializerKeyAndValueType(TypeDefine.DE_STRING, TypeDefine.DE_STRING)
                .buildKafkaConmuser();
        EddieConsumer eddieConsumer = new EddieConsumer(\"test\", consumer);
        eddieConsumer.start();

    }
}

 

总的来说,我个人觉得kafka学习的主要难度在于只有官网可查,网上很多资料要不太老,要么根本就是个超简单东西摆在那,一点实际价值都没得。所以搞这么一个博客,记录自己的学习笔记,省着忘了。
GitHUb

然后最后感谢 Java高端交流群一 中的 Κёлτ && 那一天,通过与他们讨论,解决了我很多问题,包括线程底层原理的部分。

收藏 打印