ContextCleaner是SparkContext中的组件之一。ContextCleaner用于清理那些超出应用范围的RDD、Shuffle对应的map任务状态、Shuffle元数据、Broadcast对象以及RDD的Checkpoint数据。
创建ContextCleaner
创建ContextCleaner的代码如下。
_cleaner =
if (_conf.getBoolean(\"spark.cleaner.referenceTracking\", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
根据上述代码,我们知道可以通过配置属性spark.cleaner.referenceTracking(默认是true)来决定是否启用ContextCleaner。
ContextCleaner的组成如下:
- referenceQueue:缓存顶级的AnyRef引用;
- referenceBuffer:缓存AnyRef的虚引用;
- listeners:缓存清理工作的监听器数组;
- cleaningThread:用于具体清理工作的线程。此线程为守护线程,名称为Spark Context Cleaner。
- periodicGCService:类型为ScheduledExecutorService,用于执行GC(garbage collection,即垃圾收集)的调度线程池,此线程池只包含一个线程,启动的线程名称以context-cleaner-periodic-gc开头。
- periodicGCInterval:执行GC的时间间隔。可通过spark.cleaner.periodicGC.interval属性进行配置,默认是30分钟。
- blockOnCleanupTasks:清理非Shuffle的其它数据是否是阻塞式的。可通过spark.cleaner.referenceTracking.blocking属性进行配置,默认是true。
- blockOnShuffleCleanupTasks:清理Shuffle数据是否是阻塞式的。可通过spark.cleaner.referenceTracking.blocking.shuffle属性进行配置,默认是false。清理Shuffle数据包括:清理MapOutputTracker中指定ShuffleId对应的map任务状态和ShuffleManager中注册的ShuffleId对应的Shuffle元数据。
- stopped:ContextCleaner是否停止的状态标记。
启动ContextCleaner
SparkContext在初始化的过程中会启动ContextCleaner,只有这样ContextCleaner才能够清理那些超出应用范围的RDD、Shuffle对应的map任务状态、Shuffle元数据、Broadcast对象以及RDD的Checkpoint数据。启动ContextCleaner的代码如下:
def start(): Unit = {
cleaningThread.setDaemon(true)
cleaningThread.setName(\"Spark Context Cleaner\")
cleaningThread.start()
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
根据上述代码,启动ContextCleaner的步骤如下。
- 将cleaningThread设置为守护线程,并指定名称为Spark Context Cleaner。
- 启动cleaningThread。
- 给periodicGCService设置以periodicGCInterval作为时间间隔定时进行GC操作的任务。
除了GC的定时器,ContextCleaner其余部分的工作原理和listenerBus一样(也采用监听器模式,由异步线程来处理),因此不再赘述。异步线程实际只是调用keepCleaning方法,其实现见代码清单1。
代码清单1 keepCleaning的实现
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
synchronized {
reference.foreach { ref =>
logDebug(\"Got cleaning task \" + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError(\"Error in cleaning thread\", e)
}
}
}
从代码清单1可以看出,异步线程将匹配各种引用,并执行相应的方法进行清理。以doCleanupRDD为例,其实现见代码清单2。
代码清单2 清理RDD数据
def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
try {
logDebug(\"Cleaning RDD \" + rddId)
sc.unpersistRDD(rddId, blocking)
listeners.asScala.foreach(_.rddCleaned(rddId))
logInfo(\"Cleaned RDD \" + rddId)
} catch {
case e: Exception => logError(\"Error cleaning RDD \" + rddId, e)
}
}
根据代码清单2,doCleanupRDD的执行步骤如下。
- 调用SparkContext的unpersistRDD方法从内存或磁盘中移除RDD。
- 从persistentRdds中移除对RDD的跟踪。
- 调用所有监听器的rddCleaned方法。
继续阅读与本文标签相同的文章
上一篇 :
ERP软件与财务管理软件有什么不同?
下一篇 :
福特的Sync 4信息娱乐系统亮相
-
广东ETC货源充足,线上申办最快三个工作日到货
2026-05-19栏目: 教程
-
APP界面布局小技巧,快上车!
2026-05-19栏目: 教程
-
扫地机器人市场快速增长 扫、擦功能分离是趋势
2026-05-19栏目: 教程
-
召唤师终于等到了,《英雄联盟》手游开放预约,预约人数挤爆服务器
2026-05-19栏目: 教程
-
人工智能走进西藏特殊教育学校 以“声”为“眼”助力盲童阅读
2026-05-19栏目: 教程
