微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink源码解析三TaskManager启动分析

1、Flink主节点TaskManager启动分析:

        TaskManager是Flink的worker节点,它负责Flink中本机slot资源的管理以及具体task的执行。
        TaskManager上的基本资源单位是slot,一个作业的task最终会部署在一个TM的slot上运行,TM会负责维护本地的slot资源列表,并来与FlinkMaster和JobManager通信。
        根据前面的启动分析:TaskManager的启动主类:TaskManagerRunner。

2、源码分析

代码执行的大致流程如下:

TaskManagerRunner.main()
    runTaskManagerSecurely(args, ResourceID.generate());
        # 加载配置
        Configuration configuration = loadConfiguration(args);
        # 启动 TaskManager
        runTaskManagerSecurely(configuration, resourceID);
            # 启动 TaskManager
            runTaskManager(configuration, resourceID, pluginManager);
                # 构建 TaskManagerRunner 实例
                taskManagerRunner = new TaskManagerRunner(...);
                    # 初始化一个线程池
                    this.executor = Executors.newScheduledThreadPool(....)
                    # 获取高可用模式
                    highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...)
                    # 创建 RPC 服务
                    rpcService = createRpcService(configuration,
                    highAvailabilityServices);
                    # 创建心跳服务
                    heartbeatServices = HeartbeatServices.fromConfiguration(conf);
                    # 创建 BlobcacheService
                    blobcacheService = new BlobcacheService(....)
                    # 创建 TaskManager
                    taskManager = startTaskManager(.....)
                    # 初始化 TaskManagerServices
                    taskManagerServices = TaskManagerServices.fromConfiguration(...)
                    # 初始化 TaskEventdispatcher
                    taskEventdispatcher = new TaskEventdispatcher();
                    # 初始化 IOManagerASync
                    ioManager = new IOManagerAsync(...)
                    # 初始化 NettyShuffleEnvironment
                    shuffleEnvironment = createShuffleEnvironment(...)
                    # 初始化 KvstageService
                    kvstateService =
                    KvstateService.fromConfiguration(...)
                    # 初始化 broadCastvariableManager
                    broadcastvariableManager = new broadcastvariableManager();
                    # 初始化 TaskSlottable
                    taskSlottable = createTaskSlottable(...)
                    # 初始化 DefaultJobTable
                    jobTable = DefaultJobTable.create();
                    # 初始化 JobleaderService
                    jobleaderService = new DefaultJobleaderService(....)
                    # 初始化 TaskStateManager
                    taskStateManager = new TaskExecutorLocalStateStoresManager()
                    # 初始化 LibraryCacheManager
                    libraryCacheManager = new BlobLibraryCacheManager()
                    # 返回
                    return new TaskManagerServices(....)
                # 初始化一个 TaskExecutor
                return new TaskExecutor(.....)
                    # 初始化心跳管理器:jobManagerHeartbeatManager
                    this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices,resourceId);
                    # 初始化心跳管理器:resourceManagerHeartbeatManager
                    this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices,resourceId);
                    # 转到 TaskExecutor 的 onStart() 方法
                    TaskExecutor.onStart();
                        startTaskExecutorServices();
        # 启动 TaskManagerRunner
        taskManagerRunner.start();

 TaskManagerRunner的启动大致分为三类比较重要的:

  • 一些基础服务
  • TaskManagerService
  • TaskExecutor

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐