今天就跟大家聊聊有关Spark2.x中如何进行BlockManagerMaster源码剖析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
1.BlockManagerMaster创建
BlockManagerMaster要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令,它是在构造SparkEnv的时候创建的,Driver端是创建SparkContext的时候创建SparkEnv,SparkEnv中对应的初始化代码如下:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver)
这里可以看到在构造blockManagerMaster时,会创建一个BlockManagerMasterEndpoint实例并注册到了rpcEnv中,Executor中的blockManager通过Driver端BlockManagerMasterEndpoint的引用BlockManagerMasterRef与blockManagerMaster进行通信。
2.BlockManagerMaster成员函数:
//向BlockManagerMasterEndpoint发送RemoveExecutor消息,移除挂掉的Exeutor //这个函数只会在driver端调用 def removeExecutor(execId: String) { tell(RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") }
2).removeExecutorAsync()函数,代码如下:
// 跟1)作用差不多,移除挂掉的Executor,这里是非阻塞的异步方法 def removeExecutorAsync(execId: String) { driverEndpoint.ask[Boolean](RemoveExecutor(execId)) logInfo("Removal of executor " + execId + " requested") }
3).registerBlockManager()函数,代码如下:
//Executor端的BlockManager启动会,会向BlockManagerMaster进行注册// BlockManagerMaster会保存在master的blockManagerInfo中 def registerBlockManager( blockManagerId: BlockManagerId, maxOnHeapMemSize: Long, maxOffheapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $blockManagerId") val updatedId = driverEndpoint.askSync[BlockManagerId]( RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffheapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId }
//更新block数据块信息 def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { //向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果 val res = driverEndpoint.askSync[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) logDebug(s"Updated info of block $blockId") res }
//获取block所在的BockManager节点信息,这里返回的是Seq集合,
//如果block的Replication>1 一个block块,可能会在多个blockmanager
//节点上存在
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
//向BlockManagerMasterEndpoint发送GetLocations消息
driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
}
//获取参数blockManagerId之外的其他BlockManagerId, //上面说了一个block,可能会在多个blockmanager节点上存在 def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { //向BlockManagerMasterEndpoint发送GetPeers消息 driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId)) }
6).getExecutorEndpointRef()函数,代码如下:
//这里就是获取BlockManagerMasterEndpoint的引用,与其进行通信 private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); info <- blockManagerInfo.get(blockManagerId) ) yield { info.slaveEndpoint } }
//获取一个Block的状态信息,位置,占用内存和磁盘大小def getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { val msg = GetBlockStatus(blockId, askSlaves) val response = driverEndpoint. askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) val (blockManagerIds, futures) = response.unzip implicit val sameThread = ThreadUtils.sameThread val cbf = implicitly[ CanBuildFrom[Iterable[Future[Option[BlockStatus]]], Option[BlockStatus], Iterable[Option[BlockStatus]]]] val blockStatus = timeout.awaitResult( Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread)) if (blockStatus == null) { throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) } blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => status.map { s => (blockManagerId, s) } }.toMap }
BlockManagerMaster里面的各种函数处理其实都在 BlockManagerMasterEndpoint实例中,后面我们会详细剖析BlockManagerMasterEndpoint类的各个消息的具体处理流程。
看完上述内容,你们对Spark2.x中如何进行BlockManagerMaster源码剖析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注编程之家行业资讯频道,感谢大家的支持。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。