使用Flink HA功能维护JobManager中组件的生命周期,可以有效的避免因为JobManager 进程失败导致任务无法恢复的情况。
大纲
1.基于Zookeeper+Hadoop HA功能的实现
Zookeeper:
Zookeeper的结构:
-
/leaderlatch
: leaderlatch 目录下的节点用于竞选leader (相关类ZooKeeperleaderElectionService
,leaderContender
) -
/leader
: 通过监听leader下的节点获取到leader 信息的实时变化 (相关类ZooKeeperleaderRetrievalService
,leaderRetrievalListener
) -
/checkpoints
: checkpoints 目录下记录任务可用的checkpoint,最终可以获得hadoop的HA目录下的checkpoint Metadata 的路径信息 (相关类ZooKeeperCompletedCheckpointStore
) -
/checkpoint-counter
:checkpoint的计数器 (相关类ZooKeeperCheckpointIDCounter
) -
/running_job_registry
: 运行中的任务及状态 (相关类ZooKeeperRunningJobsRegistry
)Flink使用 Zookeeper不光负责竞选leader和实时通知其他组件最新的leader信息,还会存放JobManager和任务的信息,保证新的JobManager起来后,这些信息不会丢失。
举例:
ResourceManager在这个节点/leaderlatch/resource_manager_lock
竞选到leader之后会在/leader/resource_manager_lock
节点更新leader的信息,监听/leader/resource_manager_lock
节点变化的其他组件会立即使用新的地址和SessionId连接ResourceManager
Hadoop :
-
FLink会创建基于Hadoop的BlobServer (相关类
FileSystemBlobStore)
-
在HA路径下会保存Hadoop的checkpoint的元数据文件 (相关类
FileSystemStateStorageHelper
)
2.HA功能的接口概述
-
HighAvailabilityServices
: 可以获取所有组件的leaderRetrievalService
和leaderElectionService
接口及记录JobManager中需要持久化的状态,例如完成的保存点,JobGraph,BlobStore,任务调度的状态 -
leaderElectionService
: 负责选举leader的Service接口。具体方法:
//开启选举服务,一般是在RPC的Endpoint初始化好之后,开始调用选举
void start(leaderContender contender) throws Exception;
//停止选举服务,组件的生命周期结束,停止选举
void stop() throws Exception;
//组件选上leader之后的确认操作,并回写信息,比如在基于zk的HA上会向leader目录下的节点回写leader的信息
void confirmleaderSessionID(UUID leaderSessionID);
//判断这个sessiondId是否是leader
boolean hasleadership(@Nonnull UUID leaderSessionId); -
leaderContender
: 参与选举的接口。在Flink中需要实现HA的组件,如: ResourceManager,dispatcher,WebMonitorEndpoint,每个Job的JobManager都会实现这个接口。 通过leaderElectionService#start(leaderContender)
方法开始竞选leader -
leaderRetrievalService
: 实时接收leader的变更信息的服务。leader信息变更会调用leaderRetrievalListener
的notifyleaderAddress
方法通知新leader的变更信息(address,sessionid) -
leaderRetrievalListener
: 如果需要实时监听leader的信息,需要实现这个接口。通过对应组件实现的leaderRetrievalService#start(leaderRetrievalListener listener)
方法实时监听leader的信息
//通知有leader的信息的变更
void notifyleaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
//处理监听leader服务报错
void handleError(Exception exception);
3.基于Zookeeper实现的HA接口
ZooKeeperHaServices
ZooKeeperHaServices
(实现自步骤2的HighAvailabilityServices
),通过 ZooKeeperHaServices
可以获取每个组件的ZooKeeperleaderElectionService
和ZooKeeperleaderRetrievalService
@Override
public leaderRetrievalService getAutoRescaleleaderRetriever() {
return ZooKeeperUtils.createleaderRetrievalService(client, configuration, RESCALE_SERVICE_leader_PATH);
}
@Override
public leaderRetrievalService getResourceManagerleaderRetriever() {
return ZooKeeperUtils.createleaderRetrievalService(client, configuration, RESOURCE_MANAGER_leader_PATH);
}
@Override
public leaderRetrievalService getdispatcherleaderRetriever() {
return ZooKeeperUtils.createleaderRetrievalService(client, configuration, disPATCHER_leader_PATH);
}
@Override
public leaderRetrievalService getJobManagerleaderRetriever(JobID jobID) {
return ZooKeeperUtils.createleaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@Override
public leaderRetrievalService getJobManagerleaderRetriever(JobID jobID, String defaultJobManagerAddress) {
return getJobManagerleaderRetriever(jobID);
}
@Override
public leaderRetrievalService getWebMonitorleaderRetriever() {
return ZooKeeperUtils.createleaderRetrievalService(client, configuration, REST_SERVER_leader_PATH);
}
@Override
public leaderElectionService getResourceManagerleaderElectionService() {
return ZooKeeperUtils.createleaderElectionService(client, configuration, RESOURCE_MANAGER_leader_PATH);
}
@Override
public leaderElectionService getdispatcherleaderElectionService() {
return ZooKeeperUtils.createleaderElectionService(client, configuration, disPATCHER_leader_PATH);
}
@Override
public leaderElectionService getJobManagerleaderElectionService(JobID jobID) {
return ZooKeeperUtils.createleaderElectionService(client, configuration, getPathForJobManager(jobID));
}
@Override
public leaderElectionService getWebMonitorleaderElectionService() {
return ZooKeeperUtils.createleaderElectionService(client, configuration, REST_SERVER_leader_PATH);
}
ZooKeeperleaderElectionService
ZooKeeperleaderElectionService
(实现自步骤2的leaderElectionService
)负责Flink组件选举的service。分别实现了以下三个curator的接口
-
leaderLatchListener
: 监听leaderlatch下的对应的组件的节点,已确保当前组件是否获取leader或者失去leadership -
NodeCacheListener
: 监听leader下的对应实例节点发生变化,且当前实例节点是leader,则向leader对应节点重新写入当前实例的连接信息 -
UnhandledErrorListener
: 监听是否与zk通信出错
leaderContender
leaderContender
接口不同的实现对应不同的选举者,举例ResourceManager 在确认选举上leader,旧状态清除后,
-
会设置旧的FencedRpcEndpoint设置新的Fencingtoken(防止脑裂)
private CompletableFuture<Boolean> tryAcceptleadership(final UUID newleaderSessionID) {
if (leaderElectionService.hasleadership(newleaderSessionID)) {
final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newleaderSessionID);
log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
// clear the state if we've been the leader before
if (getFencingToken() != null) {
clearStateInternal();
}
setFencingToken(newResourceManagerId);
startServicesOnleadership();
return prepareleadershipAsync().thenApply(ignored -> true);
} else {
return CompletableFuture.completedFuture(false);
}
} -
开启与TaskManager,JobManager的心跳服务和SlotManager
protected void startServicesOnleadership() {
startHeartbeatServices();
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
}
@Override
public void grantleadership(final UUID newleaderSessionID) {
final CompletableFuture<Boolean> acceptleadershipFuture = clearStateFuture
.thenComposeAsync((ignored) -> tryAcceptleadership(newleaderSessionID), getUnfencedMainThreadExecutor());
final CompletableFuture<Void> confirmationFuture = acceptleadershipFuture.thenAcceptAsync(
(acceptleadership) -> {
if (acceptleadership) {
// confirming the leader session ID might be blocking,
leaderElectionService.confirmleaderSessionID(newleaderSessionID);
}
},
getRpcService().getExecutor());
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
});
}
有一点需要提到的是confirmleaderSessionID方法主要是向leader下的节点写入连接信息
@Override
public void confirmleaderSessionID(UUID leaderSessionID) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Confirm leader session ID {} for leader {}.",
leaderSessionID,
leaderContender.getAddress());
}
Preconditions.checkNotNull(leaderSessionID);
if (leaderLatch.hasleadership()) {
// check if this is an old confirmation call
synchronized (lock) {
if (running) {
if (leaderSessionID.equals(this.issuedleaderSessionID)) {
confirmedleaderSessionID = leaderSessionID;
writeleader@R_229_4045@ion(confirmedleaderSessionID);
}
} else {
LOG.debug("Ignoring the leader session Id {} confirmation, since the " +
"ZooKeeperleaderElectionService has already been stopped.", leaderSessionID);
}
}
} else {
LOG.warn("The leader session ID {} was confirmed even though the " +
"corresponding JobManager was not elected as the leader.", leaderSessionID);
}
}
ZooKeeperleaderRetrievalService
ZooKeeperleaderRetrievalService(实现自步骤2的leaderRetrievalService
),监听Flink leader信息的变更的服务。分别实现了两个curator的接口:
-
UnhandledErrorListener
: 监听是否与zk通信出错 -
NodeCacheListener
监听leader下对应的实例节点是否变更,如果变更则通知持有`leaderRetrievalListener
实现类的其他实例,重新连接该实例的新leader
leaderRetrievalListener的对应实现
比如JobMaster的ResourceManagerleaderListener(实现自 leaderRetrievalListener
),当收到leader变更,则会连接新的ResourceManager
private class ResourceManagerleaderListener implements leaderRetrievalListener {
@Override
public void notifyleaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerleader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
public void handleError(final Exception exception) {
handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception));
}
}
4.手工课: 添加个新的组件并使用HA功能维护生命周期
使用HA模块,整体上我们可以非常方便地添加一个新的组件,使用HA维护其生命周期。现在举例添加个RescaleCoordinator组件。
private static final String RESCALE_SERVICE_leader_PATH ="xxxxx";
@Override
public leaderElectionService getAutoRescaleleaderElectionService() {
return ZooKeeperUtils.createleaderElectionService(client, configuration, RESCALE_SERVICE_leader_PATH);
}
@Override
public leaderRetrievalService getAutoRescaleleaderRetriever() {
return ZooKeeperUtils.createleaderRetrievalService(client, configuration, RESCALE_SERVICE_leader_PATH);
}
-
实现AutoRescaleCoodinator组件,继承自FencedRpcEndpoint(RPC的节点,后面分享Flink RPC实现的时候会详细讲这个) ,leaderContender(上文提到过),AutoRescaleGateway(RPC调用的接口声明)
public class AutoRescaleCoodinator extends FencedRpcEndpoint<AutoRescaleCoodinatorId> implements leaderContender, AutoRescaleGateway{
autoRescaleleaderElectionService=highAvailabilityServices.getAutoRescaleleaderElectionService();
}
在AutoRescaleCoodinator启动成功后开始参与选举
@Override
protected void onStart() throws Exception {
autoRescaleleaderElectionService.start(this);
}
在被通知选上leader之后,初始化服务,设置Fencingtoken ,最后向leaderElectionService确认已选上leader
@Override
public void grantleadership(UUID leaderSessionID) {
logger.info("autorescale coodinator {} grant leadership", leaderSessionID);
if (autoRescaleleaderElectionService.hasleadership(leaderSessionID)) {
try {
if (configuration.getBoolean(RescaleOptions.RESCALE_ENABLE)) {
initAutoRescaleCoordinatorService();
} else {
//只能触发手动伸缩容
logger.info("当前任务未开启自动伸缩容功能");
}
setFencingToken(new AutoRescaleCoodinatorId(leaderSessionID));
autoRescaleleaderElectionService.confirmleaderSessionID(leaderSessionID);
} catch (Exception exception) {
if (schedulerUtil.isRunning()){
schedulerUtil.close();
}
this.handleError(new RuntimeException("AutoRescaleCoodinator 选主失败",exception));
}
}
}
在revokeleadership方法中停止AutoRescaleCoodinator内置的服务
@Override
public void revokeleadership() {
schedulerUtil.close();
runAsyncWithoutFencing(
() -> {
log.info("AutoRescaleCoordinator {} was revoked leadership.", getAddress());
setFencingToken(null);
});
}
-
创建
RpcGatewayRetriever
对象(实现leaderRetrievalListener
接口)
//从ZooKeeperHaServices中获取AutoRescaleleaderRetriever
autoRescaleleaderRetrieverService = highAvailabilityServices.getAutoRescaleleaderRetriever();
//新建RescaleCoodinator的RpcGatewayRetriever
RpcGatewayRetriever<AutoRescaleCoodinatorId, AutoRescaleGateway> rescaleCoGtwRetriever = new RpcGatewayRetriever<>(rpcService, AutoRescaleGateway.class, AutoRescaleCoodinatorId::fromUuid, 10, Time.milliseconds(50L));
//实时从zk监听RescaleCoodinator的信息的变化
autoRescaleleaderRetrieverService.start(rescaleCoGtwRetriever)
-
最后通过rescaleCoodinatorRetriever可以获取Gateway接口与RescaleCoodinator通信了
@Override
public CompletableFuture<String> callOnlineRescale(RescaleState rescaleState) {
return rescaleCoodinatorRetriever.getFuture().thenCompose(
autoRescaleGateway -> autoRescaleGateway.doRescale(rescaleState)
);
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。