博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark技术内幕: Task向Executor提交的源码解析
阅读量:7059 次
发布时间:2019-06-28

本文共 4997 字,大约阅读时间需要 16 分钟。

在上文《》中,我们分析了Stage的生成和提交。但是Stage的提交,只是DAGScheduler完成了对DAG的划分,生成了一个计算拓扑,即需要按照顺序计算的Stage,Stage中包含了可以以partition为单位并行计算的Task。我们并没有分析Stage中得Task是如何生成并且最终提交到Executor中去的。

这就是本文的主题。

从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks开始,分析Stage是如何生成TaskSet的。

如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks。

org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程如下:

  1. 首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。
  2. 序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。
  3. 为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task
  4. 确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的
  5. 通过TaskScheduler提交TaskSet。
TaskSet就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。
TaskSet是一个数据结构,存储了这一组task:
private[spark] class TaskSet(    val tasks: Array[Task[_]],    val stageId: Int,    val attempt: Int,    val priority: Int,    val properties: Properties) {    val id: String = stageId + "." + attempt  override def toString: String = "TaskSet " + id}
管理调度这个TaskSet的时org.apache.spark.scheduler.TaskSetManager,TaskSetManager会负责task的失败重试;跟踪每个task的执行状态;处理locality-aware的调用。
详细的调用堆栈如下:
  1. org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
  2. org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
  3. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
  4. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers
  5. org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
  6. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks
  7. org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask
  8. org.apache.spark.executor.Executor#launchTask
首先看一下org.apache.spark.executor.Executor#launchTask:
def launchTask(      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {    val tr = new TaskRunner(context, taskId, taskName, serializedTask)    runningTasks.put(taskId, tr)    threadPool.execute(tr) // 开始在executor中运行  }
TaskRunner会从序列化的task中反序列化得到task,这个需要看 
org.apache.spark.executor.Executor.TaskRunner#run 的实现:task.run(taskId.toInt)。而task.run的实现是:
final def run(attemptId: Long): T = {    context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)    context.taskMetrics.hostname = Utils.localHostName()    taskThread = Thread.currentThread()    if (_killed) {      kill(interruptThread = false)    }    runTask(context)  }
对于原来提到的两种Task,即
  1.  org.apache.spark.scheduler.ShuffleMapTask
  2.  org.apache.spark.scheduler.ResultTask
分别实现了不同的runTask:
org.apache.spark.scheduler.ResultTask#runTask即顺序调用rdd的compute,通过rdd的拓扑顺序依次对partition进行计算:
override def runTask(context: TaskContext): U = {    // Deserialize the RDD and the func using the broadcast variables.    val ser = SparkEnv.get.closureSerializer.newInstance()    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)    metrics = Some(context.taskMetrics)    try {      func(context, rdd.iterator(partition, context))    } finally {      context.markTaskCompleted()    }  }
而org.apache.spark.scheduler.ShuffleMapTask#runTask则是写shuffle的结果,
override def runTask(context: TaskContext): MapStatus = {    // Deserialize the RDD using the broadcast variable.    val ser = SparkEnv.get.closureSerializer.newInstance()    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)      //此处的taskBinary即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的    metrics = Some(context.taskMetrics)    var writer: ShuffleWriter[Any, Any] = null    try {      val manager = SparkEnv.get.shuffleManager      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 将rdd计算的结果写入memory或者disk      return writer.stop(success = true).get    } catch {      case e: Exception =>        if (writer != null) {          writer.stop(success = false)        }        throw e    } finally {      context.markTaskCompleted()    }  }
这两个task都不要按照拓扑顺序调用rdd的compute来完成对partition的计算,不同的是ShuffleMapTask需要shuffle write,以供child stage读取shuffle的结果。 对于这两个task都用到的taskBinary,即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的。
通过上述几篇博文,实际上我们已经粗略的分析了从用户定义SparkContext开始,集群是如果为每个Application分配Executor的,回顾一下这个序列图:
还有就是用户触发某个action,集群是如何生成DAG,如果将DAG划分为可以成Stage,已经Stage是如何将这些可以pipeline执行的task提交到Executor去执行的。当然了,具体细节还是非常值得推敲的。以后的每个周末,都会奉上某个细节的实现。
休息了。明天又会开始忙碌的一周。

转载于:https://www.cnblogs.com/wuwa/p/6190779.html

你可能感兴趣的文章
Ehcache学习笔记(二) 根据条件筛选缓存中的数据
查看>>
逻辑数据库设计 - 乱穿马路(多对多关系)
查看>>
Analysis Service Tabular Model #002 Analysis services 的结构:一种产品 两个模型
查看>>
PostgreSQL 的 pl/pgsql 的 cannot begin/end transactions in PL/pgSQL错误
查看>>
多线程编程之三——线程间通讯
查看>>
vs快捷键
查看>>
Oracle DBA常用查询
查看>>
修复Telerik reporting 在网页中使用时的样式
查看>>
Hackers’ Crackdown-----UVA11825-----DP+状态压缩
查看>>
Waiting Processed Cancelable ShowDialog
查看>>
[leetcode]Spiral Matrix
查看>>
hdu 1232 畅通工程(并查集)
查看>>
在github上写个人简历——先弄个主页
查看>>
用jquery实现遮罩层
查看>>
POJ 2229 Sumsets(技巧题, 背包变形)
查看>>
啥时候js单元测试变的重要起来?
查看>>
使用strtotime和mktime时参数为0时返回1999-11-30的时间戳问题
查看>>
php mysql 扩展安装
查看>>
Thrift架构~目录
查看>>
c++ 调用matlab程序
查看>>