微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

15.Spark源码分析

Spark源码分析

各个组件介绍

后面补充。。。。

StandAlone模式

在StandAlone模式的start-all的shell启动脚本下,在当前机器执行了JAVA_HOME/bin/java -cp ....Master和在配置的slave的机器中执行 JAVA_HOME/bin/java -cp ....Worker.这两种进程在启动后通过netty进行rpc通信。

Master的启动

  1. 首先创建一个RpcEnv对象,负责管理所有通信逻辑,核心代码val rpcEnv: RpcEnv = RpcEnv.create(SYstem_NAME, host, port, conf, securityMgr),创建后启动NettyRpcEnv Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
  2. 接着创建一个Master的EndPoint对象val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  3. 该EndPoint对象有constructor -> onStart -> receive* -> onStop这样一个生命周期

Worker的启动

  1. Worker以类似的情形创建RpcEnv和Worker的EndPoint
  2. Worker作为一个EndPoint,其同样有constructor -> onStart -> receive* -> onStop
  3. 创建一个masterEndpointRef对象val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

Master和Worker的通信

  1. 在Worker的Onstart阶段(方法)向Master注册 registerWithMaster() -->通过Master的引用通信masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete-->ask请求的Master接收再回复调用receiveAndReply-->成功回复context.reply(RegisteredWorker(self, masterWebUiUrl))-->worker在收到注册成功后,开始发送心跳forwordMessageScheduler.scheduleAtFixedrate(new Runnable
  2. 之后进入receive阶段worker不断发送心跳 sendToMaster(Heartbeat(workerId, self)),Master不断记录心跳workerInfo.lastHeartbeat = System.currentTimeMillis()

Yarn On Spark模式

Yarn Cluster

总体流程图

  1. 执行Submit中的main方法, 反射调用client的main方法
  2. 向rm提交bin/java applicationMaster
  3. rm寻找一台NN启动applicationMaster
  4. NN节点启动Driver进程
  5. Driver向RM申请资源
  6. Driver获得资源后向一个NN发送指令,bin/java CoarseGrainedExecutorBackend
  7. 启动ExecutorBackend, 这个进程启动后向Driver进行注册
  8. Driver返回注册成功的信息
  9. ExectureBack创建一个Execture对象
  10. Driver向Execture分配任务

创建Sparksubmit创建

  1. 从SparkSubmit类的main方法开始,调用submit(appArgs)
  2. 准备运行环境prepareSubmitEnvironment
  3. 调用doRunMain,之后再调用runMain()方法
  4. 使用反射的方式加载 childMainClass = "org.apache.spark.deploy.yarn.Client"
  5. 执行Client的main方法mainMethod.invoke(null, childArgs.toArray)
  6. 调用new Client(args, sparkConf).run(),创建一个Client对象并运行它的run方法

创建Driver

  1. Client 对象中的run方法会启动一个yarn的客户端,与yarn集群通信
  2. 通过客户端设置正确的上下文对象来启动 ApplicationMaster val containerContext = createContainerLaunchContext(newAppResponse)
  3. 源码进入ApplicationMaster了,在其main方法中创建了一个master对象,并调用这个对象的run()方法,在run方法调用runDriver(securityMgr),runDriver是一个子线程。
  4. 这个线程像RM注册并且申请资源 allocator.allocateResources()
  5. 申请到资源之后 prepareCommand() 依旧是准备一个command执行。/bin/java org.apache.spark.executor.

创建ExecutorBacken

  1. 启动ExecutorBackend, 并向driver注册. Driver.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))}(ThreadUtils.sameThread).onComplete (on start)
  2. 注册成功后, ExecutorBackend会创建一个Executor对象.executor = new Executor(executorId, hostname, env, userClasspath, isLocal = false) //创建计算对象 (on recieve)
  3. Driver会给ExecutorBackend分配任务, 并监控任务的执行.

Yarn Client模式

Shuffle分析

碰到reduceByKey不一定shuffle。

gdgylpc 发布了118 篇原创文章 · 获赞 5 · 访问量 7143 私信 关注

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐