累加器

累加器提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用法是在调测时对作业执行过程中的时间进行计数。

例:累加空行

val sc = new SparkContext()
val file = sc.textFile(\"file.txt\")
val blankLines = sc.accumulator(0)//创建Accumulator[Int]并初始化为0
val callSigns = file.flatMap(line =>{
	if(line == \"\"){
	 blankLines += 1    //累加器+1
	}
	line.split(\" \")
})
callSigns.saveAsSequenceFile(\"output.txt\")
println(\"blank Lines:\" + blankLines.value)

广播变量

广播变量可以让程序高效的向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作应用

广播变量就是类型为spark.broadcast.Broadcast[T]的一个对象,其中存放着类型为T的值。可以在任务中通过对Broadcast对象调用value来获取该对象的值。这个值只会被发送到各个节点一次,使用的是一种高效的类似BitTorrent的机制。

例:在scala中使用广播变量查询国家

val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign,count) =>
  val country = lookupInArray(sign,signPrefixes.value)
  (country,count)
  }.reduceByKey((x,y) => x + y)
  countryContactCounts.saveAsSequenceFile(outputDir + \"/countries.txt\")

基于分区进行操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作,Spark提供基于分区的map和foreach,让你的部分代码只对RDD的每个分区运行一次,以用来降低操作代价。

例:使用共享连接池与JSON解释器

val contactsContactLists = validSigns.distinct().mapPartitions{
  signs => 
  val mapper = create Mapper()
  val client = new HttpClient()
  client.start()
  //创建http请求
  sign.map {sign =>
    createExchangeForSign(sign)
  //获取响应
  }.map{case (sign,exchange) =>
      (sign, readExchangeCallLog(mapper,exchange))
  }.filter(x => x._2 !=null) //删除空的呼叫日志
}

数值RDD的操作

spark的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型,这些统计数据都会在调用stats时通过一次遍历数据计算出来,并以StatsCounter对象返回,下面列出StatsCounter上可用的方法。

方法 含义
count()

RDD中元素的个数

means()

元素的平均值

sum() 总和
max() 最大值
min() 最小值
variance() 元素的方差
sampleVariance() 从采样中计算出的方差
stdev() 标准差
sampleStdev() 采样的标准差

 

 

 

 

 

 

 

 

 

 

如果只想计算这些统计数据中的一个,可以直接对RDD调用对应方法:rdd.max() /rdd.min()

 

收藏 打印