博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark源码分析 – Deploy
阅读量:6832 次
发布时间:2019-06-26

本文共 14850 字,大约阅读时间需要 49 分钟。

参考,

 

Client

Client在SparkDeploySchedulerBackend被start的时候, 被创建, 代表一个application和spark cluster进行通信

Client的逻辑很简单, 封装ClientActor, 并负责该Actor的start和stop
而ClientActor的关键在于preStart的时候, 向master注册该application, 并且在执行过程中接收master发来的event

/** * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description, * and a listener for cluster events, and calls back the listener when various events occur. */private[spark] class Client(    actorSystem: ActorSystem,    masterUrl: String,    appDescription: ApplicationDescription,    listener: ClientListener)  extends Logging {  var actor: ActorRef = null  var appId: String = null  class ClientActor extends Actor with Logging {    var master: ActorRef = null    var masterAddress: Address = null    var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times    override def preStart() {      try {        master = context.actorFor(Master.toAkkaUrl(masterUrl)) // 创建master ActorRef, 用于和master通信        masterAddress = master.path.address        master ! RegisterApplication(appDescription) // 向master注册该application        context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])        context.watch(master)  // Doesn't work with remote actors, but useful for testing      } catch {        case e: Exception =>          markDisconnected()          context.stop(self)      }    }    override def receive = { // 接收master发来的各种events      case RegisteredApplication(appId_) =>      case ApplicationRemoved(message) =>      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>      case ExecutorUpdated(id, state, message, exitStatus) =>      case Terminated(actor_) if actor_ == master =>      case RemoteClientDisconnected(transport, address) if address == masterAddress =>      case RemoteClientShutdown(transport, address) if address == masterAddress =>      case StopClient =>    }  }  def start() {    // Just launch an actor; it will call back into the listener.    actor = actorSystem.actorOf(Props(new ClientActor))  }  def stop() {    if (actor != null) {      try {        val future = actor.ask(StopClient)(timeout)        Await.result(future, timeout)      } catch {        case e: TimeoutException =>          logInfo("Stop request to Master timed out; it may already be shut down.")      }      actor = null    }  }}

Master

client负责提交application给master, 而worker也会向master注册

所以Master作为Spark cluster的接口, 负责从client接收application请求, 并分配相应的worker资源给这个app 
处理的关键消息, RegisterWorker, RegisterApplication或ExecutorStateChanged, 最终都会调用schedule
schedule是他的核心函数, 这里首先只会根据worker的CPU cores进行schedule, 而不会考虑其他的资源, 可用考虑让app尽可能分布在更多或更少的workers上
最后向worker actor发送LaunchExecutor, 真正启动ExecutorBackend

private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {   var nextAppNumber = 0  val workers = new HashSet[WorkerInfo] // track workers  val idToWorker = new HashMap[String, WorkerInfo]  val actorToWorker = new HashMap[ActorRef, WorkerInfo]  val addressToWorker = new HashMap[Address, WorkerInfo]  val apps = new HashSet[ApplicationInfo] // track apps   val idToApp = new HashMap[String, ApplicationInfo]  val actorToApp = new HashMap[ActorRef, ApplicationInfo]  val addressToApp = new HashMap[Address, ApplicationInfo]  val waitingApps = new ArrayBuffer[ApplicationInfo] // 未完成的apps, schedule的对象   val completedApps = new ArrayBuffer[ApplicationInfo]
override def receive = {    case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(        host, workerPort, cores, Utils.megabytesToString(memory)))      if (idToWorker.contains(id)) {        sender ! RegisterWorkerFailed("Duplicate worker ID")      } else {        addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)        context.watch(sender)  // This doesn't work with remote actors but helps for testing        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)        schedule() // 重新schedule       }    }    case RegisterApplication(description) => {      logInfo("Registering app " + description.name)      val app = addApplication(description, sender)      logInfo("Registered app " + description.name + " with ID " + app.id)      waitingApps += app      context.watch(sender)  // This doesn't work with remote actors but helps for testing      sender ! RegisteredApplication(app.id)      schedule() // 重新schedule    }    // 当executor的状态发生变化时, 这里只处理失败的case
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))      execOption match {        case Some(exec) => { // 说明该executor是有记录的,合法的          exec.state = state          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) // 向driver actor发送ExecutorUpdated事件          if (ExecutorState.isFinished(state)) { // isFinished, means KILLED, FAILED, LOST, 即失败的case,名字起的不好            val appInfo = idToApp(appId)            // 先删除该executor, 释放出coresLeft, 重新schedule             // Remove this executor from the worker and app            logInfo("Removing executor " + exec.fullId + " because it is " + state)            appInfo.removeExecutor(exec)            exec.worker.removeExecutor(exec)            // Only retry certain number of times so we don't go into an infinite loop. // 在retry次数以内, 则重新schedule执行            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {              schedule()            } else { // 超过retry次数, 则整个application失败              logError("Application %s with ID %s failed %d times, removing it".format(                appInfo.desc.name, appInfo.id, appInfo.retryCount))              removeApplication(appInfo, ApplicationState.FAILED)            }          }        }        case None =>          logWarning("Got status update for unknown executor " + appId + "/" + execId)      }    }    case Heartbeat(workerId) => { // 更新worker的hb      idToWorker.get(workerId) match {        case Some(workerInfo) =>          workerInfo.lastHeartbeat = System.currentTimeMillis()        case None =>          logWarning("Got heartbeat from unregistered worker " + workerId)      }    }    case Terminated(actor)
case RemoteClientDisconnected(transport, address)
case RemoteClientShutdown(transport, address)
case RequestMasterState
case CheckForWorkerTimeOut
case RequestWebUIPort
}
 
/**   * Schedule the currently available resources among waiting apps. This method will be called   * every time a new app joins or resource availability changes.   */  def schedule() {    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app    // in the queue, then the second app, etc.    if (spreadOutApps) { // 让app分布到尽可能多的worker上去      // Try to spread out each app among all the nodes, until it has all its cores      for (app <- waitingApps if app.coresLeft > 0) { // coresLeft表示该app是否还需要更多的cores, 表示并发度        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) // 找出可以使用的workers,本身alive,可以run这个app,最终按coresFree排序                                   .filter(canUse(app, _)).sortBy(_.coresFree).reverse // canUse的定义,worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)        val numUsable = usableWorkers.length        val assigned = new Array[Int](numUsable) // Number of cores to give on each node        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) // 一共可以分配的cores数,取需要的和可用的的min        // 下面的过程是平均从每个可用的workers上获取cores         var pos = 0        while (toAssign > 0) {          if (usableWorkers(pos).coresFree - assigned(pos) > 0) { // 不能assign的比free的多             toAssign -= 1            assigned(pos) += 1          }          pos = (pos + 1) % numUsable // 如果一轮不够,就需要循环分配        }        // Now that we've decided how many cores to give on each node, let's actually give them        for (pos <- 0 until numUsable) {          if (assigned(pos) > 0) {            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))            launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome) //launch executorbackend            app.state = ApplicationState.RUNNING          }        }      }    } else { // 让app分配到尽可能少的workers上去, 逻辑更简单点      // Pack each app into as few nodes as possible until we've assigned all its cores      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {        for (app <- waitingApps if app.coresLeft > 0) {          if (canUse(app, worker)) {            val coresToUse = math.min(worker.coresFree, app.coresLeft)            if (coresToUse > 0) {              val exec = app.addExecutor(worker, coresToUse)              launchExecutor(worker, exec, app.desc.sparkHome)              app.state = ApplicationState.RUNNING            }          }        }      }    }  }
 
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)    worker.addExecutor(exec)    worker.actor ! LaunchExecutor( // 向work actor发送LaunchExecutor事件      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)    exec.application.driver ! ExecutorAdded( // 向driver actor发送ExecutorAdded事件      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)  }

 

Worker

Worker作为actor进程, 在启动时首先创建工作目录, 并向master注册自己

最主要是接收LaunchExecutor事件, 使用ExecutorRunner来run executorbackend

private[spark] class Worker(    host: String,    port: Int,    webUiPort: Int,    cores: Int,    memory: Int,    masterUrl: String,    workDirPath: String = null)  extends Actor with Logging {
 
override def preStart() {    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))    logInfo("Spark home: " + sparkHome)    createWorkDir() // 根据用户配置的sparkHome创建工作目录    connectToMaster() // 向master注册自己  }
override def receive = {    case RegisteredWorker(url) => // 注册成功,master的反馈      masterWebUiUrl = url      logInfo("Successfully registered with master")      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {        master ! Heartbeat(workerId) // 设置scheduler.schedule定期发送hb      }    case RegisterWorkerFailed(message) =>      logError("Worker registration failed: " + message)      System.exit(1)    case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))      val manager = new ExecutorRunner( // 创建ExecutorRunner, 并start        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)      executors(appId + "/" + execId) = manager      manager.start()      coresUsed += cores_      memoryUsed += memory_      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None) // 发送给master ExecutorStateChanged事件,汇报ExecutorState.RUNNING    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => // 接收ExecutorRunner发来的ExecutorStateChanged事件      master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) // 转发给master      val fullId = appId + "/" + execId      if (ExecutorState.isFinished(state)) {        val executor = executors(fullId)        logInfo("Executor " + fullId + " finished with state " + state +          message.map(" message " + _).getOrElse("") +          exitStatus.map(" exitStatus " + _).getOrElse(""))        finishedExecutors(fullId) = executor        executors -= fullId        coresUsed -= executor.cores        memoryUsed -= executor.memory      }    case KillExecutor(appId, execId) =>      val fullId = appId + "/" + execId      executors.get(fullId) match {        case Some(executor) =>          logInfo("Asked to kill executor " + fullId)          executor.kill()        case None =>          logInfo("Asked to kill unknown executor " + fullId)      }    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>      masterDisconnected()    case RequestWorkerState => {      sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,        finishedExecutors.values.toList, masterUrl, cores, memory,        coresUsed, memoryUsed, masterWebUiUrl)    }  }

 

ExecutorRunner

创建线程执行fetchAndRunExecutor
并且在线程中, 使用ProcessBuilder启动StandaloneExecutorBackend子进程

val args = Seq(driverUrl, "{
{EXECUTOR_ID}}", "{
{HOSTNAME}}", "{
{CORES}}")val command = Command("org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)

为何不直接创建子进程?

 

/** * Manages the execution of one executor process. */private[spark] class ExecutorRunner(    val appId: String,    val execId: Int,    val appDesc: ApplicationDescription,    val cores: Int,    val memory: Int,    val worker: ActorRef,    val workerId: String,    val host: String,    val sparkHome: File,    val workDir: File)  extends Logging {
 
val fullId = appId + "/" + execId  var workerThread: Thread = null  var process: Process = null  var shutdownHook: Thread = null
 
def start() {    workerThread = new Thread("ExecutorRunner for " + fullId) {      override def run() { fetchAndRunExecutor() } // 创建线程执行fetchAndRunExecutor    }    workerThread.start()
}
 
def buildCommandSeq(): Seq[String] = {    val command = appDesc.command    val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")    // SPARK-698: do not call the run.cmd script, as process.destroy()    // fails to kill a process tree on Windows    Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ //java执行command.mainClass      command.arguments.map(substituteVariables)  }
 
/**   * Download and run the executor described in our ApplicationDescription   */  def fetchAndRunExecutor() {    try {      //调用ProcessBuilder, 使用进程执行command
//Launch the process      val command = buildCommandSeq()      val builder = new ProcessBuilder(command: _*).directory(executorDir)      val env = builder.environment()      for ((key, value) <- appDesc.command.environment) {        env.put(key, value)      }      // In case we are running this from within the Spark Shell, avoid creating a "scala"      // parent process for the executor command      env.put("SPARK_LAUNCH_WITH_SCALA", "0")      process = builder.start()      // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run      // long-lived processes only. However, in the future, we might restart the executor a few      // times on the same machine.      val exitCode = process.waitFor()      worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), Some(exitCode))    } catch {        worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)      }    }  }}

转载地址:http://snjkl.baihongyu.com/

你可能感兴趣的文章
初试ASP.NET Web API/MVC API(附Demo)
查看>>
人脸识别算法初次了解
查看>>
设计模式(十)组合(结构型)
查看>>
JAVA复制文件最快的算法
查看>>
UICamera(NGUI Event system)原理
查看>>
sudo nopasswd
查看>>
用自己的话描述wcf中的传输安全与消息安全的区别(二)
查看>>
99 Lisp Problems 列表处理(P1~P28)
查看>>
实用图片滑块,传送带,幻灯片效果【附源码】
查看>>
Bluez SPP实现代码分析(转)
查看>>
android中给TextView或者Button的文字添加阴影效果
查看>>
读《被投资人“送”入看守所》一文有感(转)
查看>>
生产环境线上測试的慘淡人生
查看>>
代码阅读分析工具Understand 2.0试用
查看>>
Linux Load average负载详细解释
查看>>
Android多媒体框架图
查看>>
jps命令使用
查看>>
ADC In An FPGA
查看>>
在 Windows上配置NativeScript CLI
查看>>
ubuntu14.04 qt4 C++开发环境搭建
查看>>