需求
某个图书网站,希望看到双十一秒杀期间实时的热销排行榜单。我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5秒钟输出最近一小时内点击量最多的前 N 个商品/图书.
需求分解
将这个需求进行分解我们大概要做这么几件事情:
- 告诉 F 框架基于时间做窗口,我们这里用processingTime,不用自带时间戳
- 过滤出图书点击行为数据
- 按一小时的窗口大小,每5秒钟统计一次,做滑动窗口聚合(Sliding Window)
- 聚合,输出窗口中点击量前N名的商品
代码实现
向Kafka发消息模拟购买事件
public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); //new F KafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,F KafkaProducer.Semantic.EXACTLY_ONCE); F KafkaProducer<String> producer = new F KafkaProducer("topn",new SimpleStringSchema(),properties);/* //event-timestamp事件的发生时间 producer.setWriteTimestampToKafka(true);*/ text.addSink(producer); env.execute(); }}//其中的:MyNoParalleSource 是作者自己实现的一个并行度为1的发送器,用来向kafka发送数据:
public class MyNoParalleSource implements SourceFunction<String> {//1 //private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<String> ctx) throws Exception { while(isRunning){ //图书的排行榜 List<String> books = new ArrayList<>(); books.add("Pyhton从入门到放弃");//10 books.add("Java从入门到放弃");//8 books.add("Php从入门到放弃");//5 books.add("C++从入门到放弃");//3 books.add("Scala从入门到放弃");//0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每1秒产生一条数据 Thread.sleep(1000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; }}可见,我们每过1秒向Kafka的topn这个topic随机发送一本书的名字用来模拟购买行为。
整体实现代码如下:
public class TopN { public static void main(String[] args) throws Exception{ /** * * 书1 书2 书3 * (书1,1) (书2,1) (书3,1) * * */ //每隔5秒钟 计算过去1小时 的 Top 3 商品 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); F KafkaConsumer<String> input = new F KafkaConsumer<>("topn", new SimpleStringSchema(), properties); //从最早开始消费 位点 input.setStartFromEarliest(); DataStream<String> stream = env .addSource(input); DataStream<Tuple2<String, Integer>> ds = stream .flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型 DataStream<Tuple2<String, Integer>> wcount = ds .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5))) //key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口 .sum(1);// 将相同的key的元素第二个count值相加 wcount .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx).... //所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化) .process(new TopNAllFunction(3)) .print();//redis sink redis -> 接口 env.execute(); }// private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line //String[] tokens = value.toLowerCase().split("W+"); // emit the pairs /*for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } }*/ //(书1,1) (书2,1) (书3,1) out.collect(new Tuple2<String, Integer>(value, 1)); } } private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> { private int topSize = 3; public TopNAllFunction(int topSize) { this.topSize = topSize; } public void process( ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>( new Comparator<Integer>() { @Override public int compare(Integer y, Integer x) { return (x < y) ? -1 : 1; } }); //treemap按照key降序排列,相同count值不覆盖 for (Tuple2<String, Integer> element : input) { treemap.put(element.f1, element); if (treemap.size() > topSize) { //只保留前面TopN个元素 treemap.pollLastEntry(); } } for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap .entrySet()) { out.collect("=================
热销图书列表:
"+ new Timestamp(System.currentTimeMillis()) + treemap.toString() + "
===============
"); } } }}//查看输出:
=================热销图书列表:2019-03-05 22:32:40.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}================================热销图书列表:2019-03-05 22:32:45.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}=============== 继续阅读与本文标签相同的文章
上一篇 :
Flink-Redis-Sink
下一篇 :
Flink-Table-&-SQL
-
怎样更安全的部署SSL证书?
2026-05-23栏目: 教程
-
Git本地项目上传到远程仓库
2026-05-23栏目: 教程
-
codeigniter-3.1 PHP开发框架
2026-05-23栏目: 教程
-
URLOS安装、升级、卸载
2026-05-23栏目: 教程
-
【直播回顾】阿里技术专家巧工:GDB 接入方式以及使用建议
2026-05-23栏目: 教程
