写一个wordcount:
linux命令: nc -lk 8888
package day01
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
SteamingWordCount {
def main(args: Array[String]): Unit = {
//SparkStreaming必须要有两个进程,一个拉取,一个计算
val conf = new SparkConf().setAppName(\"SteamingWordCount\").setMaster(\"local[*]\")
val sc = new SparkContext(conf)
//创建一个SparkSteamingContext 批次的间隔为5秒切分一次
val ssc = new StreamingContext(sc,Seconds(5))
//创建一个DStream() 代表一系列的RDD
//通过一个TCP端口拉取数据创建DStream
val lines = ssc.socketTextStream(\"192.168.111.101\",8888)
//对Dstream进行操作
val words: DStream[String] = lines.flatMap(_.split(\" \"))
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
val reduced: DStream[(String, Int)] =wordAndOne.reduceByKey(_+_)
//展示结果
reduced.print()
//需要手动开启,然后会一直执行
ssc.start()
//等待命令停止
ssc.awaitTermination()
//缺点在于只能单词记录,不能累加
}
}
定义一个动态版的wordcount:
package day01
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
StateFulWordCount {
/**String,Seq[Int],Option[Int]
*第一个:key即单词
* 第二个:当前批次该单词出现的次数
* 第三个:初始化或者是以前累加过的值
*/
val updateFunc =(it:Iterator[(String,Seq[Int],Option[Int])])=>{
// it.map(t=>(t._1,t._2.sum + t._3.getOrElse(0)))
it.map{ case (w,s,o) =>(w,s.sum+o.getOrElse(0))}
}
def main(args: Array[String]): Unit = {
//SparkStreaming必须要有两个进程,一个拉取,一个计算
val conf = new SparkConf().setAppName(\"SteamingWordCount\").setMaster(\"local[*]\")
//创建一个SparkSteamingContext 批次的间隔为5秒切分一次
val ssc = new StreamingContext(conf,Milliseconds(5000))
//如果想要更新历史状态(累加),要设置checkpoint
ssc.checkpoint(\"E:\\\\大数据\")
//创建一个DStream() 代表一系列的RDD
//通过一个TCP端口拉取数据创建DStream
val lines = ssc.socketTextStream(\"192.168.111.101\",8888)
//对Dstream进行操作
val words: DStream[String] = lines.flatMap(_.split(\" \"))
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
val reduced =wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
//展示结果
reduced.print()
//需要手动开启,然后会一直执行
ssc.start()
//等待命令停止
ssc.awaitTermination()
}
}
//不同的是必须要定义一个checkpoint(\"/\")==> 把前面的数据保存起来
//需要自定义一个函数
继续阅读与本文标签相同的文章
上一篇 :
旧金山政府限制快递机器人,相关创业公司发展或受阻
下一篇 :
我进步最大的时候,并不是加班最多的时候
-
CAD如何批量导出PDF文件?别说PDF了!GIF我都能给你导出来
2026-05-19栏目: 教程
-
活动回顾丨阿里云业务中台最佳实践沙龙圆满落幕
2026-05-19栏目: 教程
-
在CentOS里查看ssh的登录记录
2026-05-19栏目: 教程
-
好程序员web前端学习路线分享纯css绘制各种图形
2026-05-19栏目: 教程
-
菜鸟裹裹快递员收入有多高?小哥晒出月流水,程序员表示羡慕了
2026-05-19栏目: 教程
