flume->kafka
# vi flume.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = /usr/bin/vmstat 1
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = first_test
a1.sinks.k1.brokerList = CentOS-01:9092,CentOS-02:9092,CentOS-03:9092
a1.sinks.k1.channel = c1
a1.sinks.k1.batchSize = 20
执行flume
#flume-ng agent --conf conf /etc/flume-ng/conf
--conf-file /etc/flume-ng/conf/flume.conf
--name a1
-Dflume.root.logger = INFO,conso
kafka接收
#kafka-console-consumer
--zookeeper CentOS-01:2181,CentOS-02:2181,CentOS-03:2181/kafka
--topic first_test
kafka->flume->hdfs
# vi flume.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = CentOS-01:9092
a1.sources.r1.kafka.topics = first_test
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.kafka.consumer.timeout.ms = 100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/kafka/
a1.sinks.k1.hdfs.rollInterval = 5
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.channel = c1
执行flume
#flume-ng agent --conf conf /etc/flume-ng/conf
--conf-file /etc/flume-ng/conf/flume.conf
--name a1
-Dflume.root.logger = INFO,conso
在本地和HDFS上创建目录
# mdir /root/flume_data
# hadoop fs -mkdir -p /data/flume
将数据移动到flume_data中
# mv /root/data.dat /root/flume_data/
继续阅读与本文标签相同的文章
上一篇 :
如何利用搜狗微信导航站为微信公众号吸粉?
下一篇 :
地震来了怎么办?日本推出地震预警APP
-
微信小程序兼容性问题。
2026-05-18栏目: 教程
-
flex的兼容性处理问题
2026-05-18栏目: 教程
-
Mac高效开发之iTerm2、Prezto和Solarized主题
2026-05-18栏目: 教程
-
将阿里云产品整合成为高校课程实训的训练营产品的实践(四)
2026-05-18栏目: 教程
-
中间人攻击,HTTPS也可以被碾压
2026-05-18栏目: 教程
