如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算、分析后的结果,这就需要实时的流式计算如F 等来保障。例如,在 TB 级别数据量的数据库中,通过 SQL 语句或相关 API直接对原始数据进行大规模关联、聚合操作,是无法做到在极短的时间内通过接口反馈到前端进行展示的。若想实现大规模数据的“即席查询”,就须用实时计算框架构建实时数仓来实现。
本文通过一个教育行业的应用案例,剖析业务系统对实时计算的需求场景,并分析了 F 和Spark 两种实现方式的异同,最后通过运用UCloud UF 产品中封装的SQL模块,来加速开发效率,更快地完成需求。
1.1 业务场景简述
在这个 K12 教育的业务系统中,学生不仅局限于纸质的练习册进行练习,还可以通过各类移动终端进行练习。基于移动终端,可以更方便地收集学生的学习数据,然后通过大数据分析,量化学习状态,快速定位薄弱知识点,进行查缺补漏。
在这套业务系统中,学生在手机 App 中对老师布置的作业进行答题训练,每次答题训练提交的数据格式如下表所示:
字段 | 含义 | 举例 |
|---|---|---|
student_id | 学生唯一ID | 学生ID_16 |
textbook_id | 教材唯一ID | 教材ID_1 |
grade_id | 年级唯一ID | 年级ID_1 |
subject_id | 科目唯一ID | 科目ID_2_语文 |
chapter_id | 章节唯一ID | 章节ID_chapter_2 |
question_id | 题目唯一ID | 题目ID_100 |
score | 当前题目扣分(0 ~ 10) | 2 |
answer_time | 当前题目作答完毕的日期与时间 | 2019-09-11 12:44:01 |
ts | 当前题目作答完毕的时间戳(java.sql.Timestamp) | Sep 11, 2019 12:44:01 PM |
例如,传入到后台的单条答题记录数据格式如下:
{
"student_id": "学生ID_16",
"textbook_id": "教材ID_1",
"grade_id": "年级ID_1",
"subject_id": "科目ID_2_语文",
"chapter_id": "章节ID_chapter_2",
"question_id": "题目ID_100",
"score": 2,
"answer_time": "2019-09-11 12:44:01",
"ts": "Sep 11, 2019 12:44:01 PM"
}然后,基于上述实时流入的数据,需要实现如下的分析任务:
- 实时统计每个题目被作答频次
- 按照年级实时统计题目被作答频次
- 按照科目实时统计每个科目下题目的作答频次
1.2 技术方案选型
针对上述几个需求点,设计了如下的方案。首先会将数据实时发送到 Kafka 中,然后再通过实时计算框架从 Kafka 中读取数据,并进行分析计算,最后将计算结果重新输出到 Kafka 另外的主题中,以方便下游框架使用聚合好的结果。
下游框架从 Kafka 中拿到聚合好的数据,并实时录入到 OLTP 的业务库中(例如:MySQL、UDW、H 、ES等),以便于接口将想要的结果实时反馈给前端。
中间的实时计算框架,则在F 和Spark中选择。2018 年 08 月 08 日,F 1.6.0 推出,支持状态过期管理(F -9510, F -9938)、支持RocksDB、在 SQL 客户端中支持 UDXF 函数,大大加强了 SQL 处理功能,同时还支持 DML 语句、支持基于多种时间类型的事件处理、Kafka Table Sink等功能。随后推出的 F 1.6.x 系列版本中,进行了大量优化。这些使得 F 成为一个很好的选择。
早先 Spark 要解决此类需求,是通过 Spark Streaming 组件实现。为此需要先生成 RDD,然后通过 RDD 算子进行分析,或者将 RDD 转换为 DataSetData 、创建临时视图,并通过 SQL 语法或者 DSL 语法进行分析。相比之下显得不够便捷和高效。后来 Spark 2.0.0 新增了 Structured Streaming 组件,具有了更快的流式处理能力,可达到和 F 接近的效果。
架构如下图所示:
本篇将省略下游框架的操作,重点介绍F 框架进行任务计算的过程(虚线框中的内容),并简述Spark的实现方法,便于读者理解其异同。
1.3 实时计算在学情分析系统中的具体实现
1.3.1 F 实践方案
1. 发送数据到 Kafka
后台服务通过 Flume 或后台接口触发的方式调用 Kafka 生产者 API,实时将数据发送到 Kafka 指定主题中。
例如发送数据如下所示:
{"student_id":"学生ID_16","textbook_id":"教材ID_1","grade_id":"年级ID_1","subject_id":"科目ID_2_语文","chapter_id":"章节ID_chapter_2","question_id":"题目ID_100","score":2,"answer_time":"2019-09-11 12:44:01","ts":"Sep 11, 2019 12:44:01 PM"}
………提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。
2. 编写 F 任务分析代码
使用 F 处理上述需求,需要将实时数据转换为 DataStream 实例,并通过 DataStream 算子进行任务分析,另外,如果想使用 SQL 语法或者 DSL 语法进行任务分析,则需要将 DataStream 转换为 Table 实例,并注册临时视图。
(1)构建 F env
env(StreamExecutionEnvironment) 是 F 当前上下文对象,用于后续生成DataStream。代码如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3)
(2)从 Kafka 读取答题数据
在 F 中读取 Kafka 数据需要指定 KafkaSource,代码如下所示:
val props = new Properties()
props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
props.setProperty("group.id", "group_consumer_learning_test01")
val f KafkaSource = new F KafkaConsumer011String, props)
val eventStream = env.addSourceString(3)进行 JSON 解析
这里通过 map 算子实现 JSON 解析,代码示例如下:
val answerDS = eventStream.map(s => {
val gson = new Gson()
val answer = gson.fromJson(s, classOf[Answer])
answer
})(4)注册临时视图
创建临时视图的目的,是为了在稍后可以基于 SQL 语法来进行数据分析,降低开发工作量。需要先获取TableEnv 实例,再将 DataStream 实例转换为 Table 实例,最后将其注册为临时视图。代码如下所示:
val tableEnv = StreamTableEnvironment.create(env)
val table = tableEnv.fromDataStream(answerDS)
tableEnv.registerTable("t_answer", table)(5)进行任务分析
接下来,便可以通过 SQL 语句来进行数据分析任务了,3 个需求对应的分析代码如下所示:
//实时:统计题目被作答频次
val result1 = tableEnv.sqlQuery(
"""SELECT
| question_id, COUNT(1) AS frequency
|FROM
| t_answer
|GROUP BY
| question_id
""".stripMargin)
//实时:按照年级统计每个题目被作答的频次
val result2 = tableEnv.sqlQuery(
"""SELECT
| grade_id, COUNT(1) AS frequency
|FROM
| t_answer
|GROUP BY
| grade_id
""".stripMargin)
//实时:统计不同科目下,每个题目被作答的频次
val result3 = tableEnv.sqlQuery(
"""SELECT
| subject_id, question_id, COUNT(1) AS frequency
|FROM
| t_answer
|GROUP BY
| subject_id, question_id
""".stripMargin)此时得到的 result1、result2、result3 均为 Table 实例。
(6)实时输出分析结果
接下来,将不同需求的统计结果分别输出到不同的 Kafka 主题中即可。
在 F 中,输出数据之前,需要先将 Table 实例转换为 DataStream 实例,然后通过 addSink 算子添加 KafkaSink即可。
因为涉及到聚合操作,Table 实例需要通过 RetractStream 来转换为 DataStream 实例。
该部分代码如下所示:
tableEnv.toRetractStreamResult1
.filter(_._1)
.map(_._2)
.map(new Gson().toJson(_))
.addSink(new F KafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092",
"test_topic_learning_2",
new SimpleStringSchema()))
tableEnv.toRetractStreamResult2
.filter(_._1)
.map(_._2)
.map(new Gson().toJson(_))
.addSink(new F KafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092",
"test_topic_learning_3",
new SimpleStringSchema()))
tableEnv.toRetractStreamResult3
.filter(_._1)
.map(_._2)
.map(new Gson().toJson(_))
.addSink(new F KafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092",
"test_topic_learning_4",
new SimpleStringSchema()))(7)执行分析计划
F 支持多流任务同时运行,执行分析计划代码如下所示:
env.execute("F StreamingAnalysis")至此,编译并运行项目后,即可看到实时的统计结果,如下图所示,从左至右的 3 个窗体中,分别代表对应需求的输出结果。
1.3.2 Spark 基于Structured Streaming的实现
Spark发送数据到Kafka,及最后的执行分析计划,与F 无区别,不再展开。下面简述差异点。
1. 编写 Spark 任务分析代码
(1)构建 SparkSession
如果需要使用 Spark 的Structured Streaming组件,首先需要创建 SparkSession 实例,代码如下所示:
val sparkConf = new SparkConf()
.setAppName("StreamingAnalysis")
.set("spark.local.dir", "F: emp")
.set("spark.default.parallelism", "3")
.set("spark.sql.shuffle.partitions", "3")
.set("spark.executor.instances", "3")
val spark = SparkSession
.builder
.config(sparkConf)
.getOrCreate()(2)从 Kafka 读取答题数据
接下来,从 Kafka 中实时读取答题数,并生成 streaming-DataSet 实例,代码如下所示:
val inputData 1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
.option("subscribe", "test_topic_learning_1")
.load()(3)进行 JSON 解析
从 Kafka 读取到数据后,进行 JSON 解析,并封装到 Answer 实例中,代码如下所示:
val keyValueDataset1 = inputData 1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val answerDS = keyValueDataset1.map(t => {
val gson = new Gson()
val answer = gson.fromJson(t._2, classOf[Answer])
answer
})其中 Answer 为 Scala 样例类,代码结构如下所示:
case class Answer(student_id: String,
textbook_id: String,
grade_id: String,
subject_id: String,
chapter_id: String,
question_id: String,
score: Int,
answer_time: String,
ts: Timestamp) extends Serializable(4)创建临时视图
创建临时视图代码如下所示:
answerDS.createTempView("t_answer")(5)进行任务分析
仅以需求1(统计题目被作答频次)为例,编写代码如下所示:
- 实时:统计题目被作答频次
//实时:统计题目被作答频次
val result1 = spark.sql(
"""SELECT
| question_id, COUNT(1) AS frequency
|FROM
| t_answer
|GROUP BY
| question_id
""".stripMargin).toJSON(6)实时输出分析结果
仅以需求1为例,输出到Kafka 的代码如下所示:
result1.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime(0))
.format("kafka")
.option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
.option("topic", "test_topic_learning_2")
.option("checkpointLocation", "./checkpoint_chapter11_1")
.start()1.3.3 使用 UF SQL 加速开发
通过上文可以发现,无论基于F 还是Spark通过编写代码实现数据分析任务时,都需要编写大量的代码,并且在生产集群上运行时,需要打包程序,然后提交打包后生成的 Jar 文件到集群上运行。
为了简化开发者的工作量,不少开发者开始致力于 SQL 模块的封装,希望能够实现只写 SQL 语句,就完成类似上述的需求。UF SQL 即是 UCloud 为简化计算模型、降低用户使用实时计算UF 产品门槛而推出的一套符合 SQL 语义的开发套件。通过 UF SQL 模块可以快速完成这一工作,实践如下。
1. 创建 UKafka 集群
在UCloud控制台UKafka创建页,选择配置并设置相关阈值,创建UKafka集群。
更多细节可以参考UKafka产品文档 https://docs.ucloud.cn/analysis/ukafka/index
提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。
2. 创建 UF 集群
在UCloud控制台UF 创建页,选择配置和运行模式,创建一个 F 集群。
- 完成创建
更多细节可以参考UF 产品文档 https://docs.ucloud.cn/analysis/uf /index
3. 编写 SQL 语句
完成之后,只需要在工作空间中创建如下形式的 SQL 语句,即可完成上述3个需求分析任务。
(1)创建数据源表
创建数据源表,本质上就是为 F 当前上下文环境执行 addSource 操作,SQL 语句如下:
CREATE TABLE t_answer(
student_id VARCHAR,
textbook_id VARCHAR,
grade_id VARCHAR,
subject_id VARCHAR,
chapter_id VARCHAR,
question_id VARCHAR,
score INT,
answer_time VARCHAR,
ts TIMESTAMP
)WITH(
type ='kafka11',
bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',
topic ='test_topic_learning_1',
groupId = 'group_consumer_learning_test01',
parallelism ='3'
);(2)创建结果表
创建结果表,本质上就是为 F 当前上下文环境执行 addSink 操作,SQL 语句如下:
CREATE TABLE t_result1(
question_id VARCHAR,
frequency INT
)WITH(
type ='kafka11',
bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',
topic ='test_topic_learning_2',
parallelism ='3'
);
CREATE TABLE t_result2(
grade_id VARCHAR,
frequency INT
)WITH(
type ='kafka11',
bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',
topic ='test_topic_learning_3',
parallelism ='3'
);
CREATE TABLE t_result3(
subject_id VARCHAR,
question_id VARCHAR,
frequency INT
)WITH(
type ='kafka11',
bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',
topic ='test_topic_learning_4',
parallelism ='3'
);(3)执行查询计划
最后,执行查询计划,并向结果表中插入查询结果,SQL 语句形式如下:
INSERT INTO
t_result1
SELECT
question_id, COUNT(1) AS frequency
FROM
t_answer
GROUP BY
question_id;
INSERT INTO
t_result2
SELECT
grade_id, COUNT(1) AS frequency
FROM
t_answer
GROUP BY
grade_id;
INSERT INTO
t_result3
SELECT
subject_id, question_id, COUNT(1) AS frequency
FROM
t_answer
GROUP BY
subject_id, question_id;SQL 语句编写完毕后,将其直接粘贴到 UF 前端页面对话框中,并提交任务,即可快速完成上述 3 个需求。如下图所示:
1.3.4. UF SQL 支持多流 JOIN
F 、Spark 目前都支持多流 JOIN,即stream-stream join,并且也都支持Watermark处理延迟数据,以上特性均可以在 SQL 中体现,得益于此,UF SQL 也同样支持纯 SQL 环境下进行 JOIN 操作、维表JOIN操作、自定义函数操作、JSON数组解析、嵌套JSON解析等。更多细节欢迎大家参考 UF SQL 相关案例展示https://docs.ucloud.cn/analysis/uf /dev/sql
1.4 总结
UF 基于 Apache F 构建,除100%兼容开源外,也在不断推出 UF SQL 等模块,从而提高开发效率,降低使用门槛,在性能、可靠性、易用性上为用户创造价值。 今年8月新推出的 F 1.9.0,大规模变更了 F 架构,能够更好地处理批、流任务,同时引入全新的 SQL 类型系统和更强大的 SQL 式任务编程。UF 预计将于10月底支持 F 1.9.0,敬请期待。
本文转载自公众号UCloud技术(ID:ucloud_tech)。
继续阅读与本文标签相同的文章
-
城市数字化后,新一代内生安全系统可全方位保护
2026-05-14栏目: 教程
-
谷歌也来“唱衰”5G,5G手机只会徒增功耗?为何这么说?
2026-05-14栏目: 教程
-
量子信息和量子技术白皮书合肥宣言在中科大发布
2026-05-14栏目: 教程
-
微信悄悄更新一新功能,来看看!
2026-05-14栏目: 教程
-
打破三大运营商垄断,第四大运营商终于来了!
2026-05-14栏目: 教程
