1、Flink主节点TaskManager启动分析:
TaskManager是Flink的worker节点,它负责Flink中本机slot资源的管理以及具体task的执行。
TaskManager上的基本资源单位是slot,一个作业的task最终会部署在一个TM的slot上运行,TM会负责维护本地的slot资源列表,并来与FlinkMaster和JobManager通信。
根据前面的启动分析:TaskManager的启动主类:TaskManagerRunner。
2、源码分析
代码执行的大致流程如下:
TaskManagerRunner.main()
runTaskManagerSecurely(args, ResourceID.generate());
# 加载配置
Configuration configuration = loadConfiguration(args);
# 启动 TaskManager
runTaskManagerSecurely(configuration, resourceID);
# 启动 TaskManager
runTaskManager(configuration, resourceID, pluginManager);
# 构建 TaskManagerRunner 实例
taskManagerRunner = new TaskManagerRunner(...);
# 初始化一个线程池
this.executor = Executors.newScheduledThreadPool(....)
# 获取高可用模式
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...)
# 创建 RPC 服务
rpcService = createRpcService(configuration,
highAvailabilityServices);
# 创建心跳服务
heartbeatServices = HeartbeatServices.fromConfiguration(conf);
# 创建 BlobcacheService
blobcacheService = new BlobcacheService(....)
# 创建 TaskManager
taskManager = startTaskManager(.....)
# 初始化 TaskManagerServices
taskManagerServices = TaskManagerServices.fromConfiguration(...)
# 初始化 TaskEventdispatcher
taskEventdispatcher = new TaskEventdispatcher();
# 初始化 IOManagerASync
ioManager = new IOManagerAsync(...)
# 初始化 NettyShuffleEnvironment
shuffleEnvironment = createShuffleEnvironment(...)
# 初始化 KvstageService
kvstateService =
KvstateService.fromConfiguration(...)
# 初始化 broadCastvariableManager
broadcastvariableManager = new broadcastvariableManager();
# 初始化 TaskSlottable
taskSlottable = createTaskSlottable(...)
# 初始化 DefaultJobTable
jobTable = DefaultJobTable.create();
# 初始化 JobleaderService
jobleaderService = new DefaultJobleaderService(....)
# 初始化 TaskStateManager
taskStateManager = new TaskExecutorLocalStateStoresManager()
# 初始化 LibraryCacheManager
libraryCacheManager = new BlobLibraryCacheManager()
# 返回
return new TaskManagerServices(....)
# 初始化一个 TaskExecutor
return new TaskExecutor(.....)
# 初始化心跳管理器:jobManagerHeartbeatManager
this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices,resourceId);
# 初始化心跳管理器:resourceManagerHeartbeatManager
this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices,resourceId);
# 转到 TaskExecutor 的 onStart() 方法
TaskExecutor.onStart();
startTaskExecutorServices();
# 启动 TaskManagerRunner
taskManagerRunner.start();
TaskManagerRunner的启动大致分为三类比较重要的:
- 一些基础服务
- TaskManagerService
- TaskExecutor
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。