简介

流式计算中,我们经常有一些场景是消费Kafka数据,进行处理,然后存储到其他的数据库或者缓存或者重新发送回其他的消息队列中。
本文讲述一个简单的Redis作为Sink的案例。
后续,我们会补充完善,比如落入H ,Kafka,Mysql等。

关于Redis Sink

F 提供了封装好的写入Redis的包给我们用,首先我们要新增一个依赖:

<dependency>    <groupId>org.apache.f </groupId>    <artifactId>f -connector-redis_2.10</artifactId>    <version>1.1.5</version></dependency>

然后我们实现一个自己的RedisSinkExample:

//指定Redis setpublic static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {public RedisCommandDe ion getCommandDe ion() {    return new RedisCommandDe ion(RedisCommand.SET, null);}public String getKeyFromData(Tuple2<String, Integer> data) {    return data.f0;}public String getValueFromData(Tuple2<String, Integer> data) {    return data.f1.toString();}}

我们用最简单的单机Redis的SET命令进行演示。

完整的代码如下,实现一个读取Kafka的消息,然后进行WordCount,并把结果更新到redis中:

public class RedisSinkTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(2000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//连接kafkaProperties properties = new Properties();properties.setProperty("bootstrap.servers", "127.0.0.1:9092");F KafkaConsumer<String> consumer = new F KafkaConsumer<>("test", new SimpleStringSchema(), properties);consumer.setStartFromEarliest();DataStream<String> stream = env.addSource(consumer);DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);//实例化F JedisPoolConfig 配置redisF JedisPoolConfig conf = new F JedisPoolConfig.Builder().setHost("127.0.0.1").setHost("6379").build();//实例化RedisSink,并通过f 的addSink的方式将f 计算的结果插入到rediscounts.addSink(new RedisSink<>(conf,new RedisSinkExample()));env.execute("WordCount From Kafka To Redis");}//public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {    String[] tokens = value.toLowerCase().split("W+");    for (String token : tokens) {        if (token.length() > 0) {            out.collect(new Tuple2<String, Integer>(token, 1));        }    }}}//指定Redis setpublic static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {public RedisCommandDe ion getCommandDe ion() {    return new RedisCommandDe ion(RedisCommand.SET, null);}public String getKeyFromData(Tuple2<String, Integer> data) {    return data.f0;}public String getValueFromData(Tuple2<String, Integer> data) {    return data.f1.toString();}}}//
收藏 打印