distributedBarrier
distributedBarrier
类的源码注释:
distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.
分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。
类比单体应用的屏障CyclicBarrier
:
distributedBarrier
源码:
public class distributedBarrier
{
// CuratorFramework实例,用于与Zookeeper进行交互
private final CuratorFramework client;
// 分布式屏障的路径
private final String barrierPath;
// 监听器
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
client.postSafeNotify(distributedBarrier.this);
}
};
/**
* 构造方法
*/
public distributedBarrier(CuratorFramework client, String barrierPath)
{
this.client = client;
this.barrierPath = PathUtils.validatePath(barrierPath);
}
/**
* 设置屏障(创建屏障节点)
*/
public synchronized void setBarrier() throws Exception
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(barrierPath);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
}
}
/**
* 移除屏障(删除屏障节点)
*/
public synchronized void removeBarrier() throws Exception
{
try
{
client.delete().forPath(barrierPath);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
}
/**
* 阻塞直到屏障不再存在
*/
public synchronized void waitOnBarrier() throws Exception
{
waitOnBarrier(-1, null);
}
/**
* 阻塞直到屏障不再存在或超时
*/
public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
boolean result;
for(;;)
{
// 屏障节点是否不存在,true为不存在,false为存在
result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);
// 屏障节点不存在,直接退出
if ( result )
{
break;
}
// 屏障节点存在,进行等待
if ( hasMaxWait )
{
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
// 等待超时,直接退出
if ( thisWaitMs <= 0 )
{
break;
}
// 继续等待
wait(thisWaitMs);
}
else
{
wait();
}
}
return result;
}
}
distributedBarrier
的源码还是比较简单的,就是通过一个Zookeeper
节点的创建与删除来实现分布式屏障。
测试
pom.xml
:
@H_502_64@<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kaven</groupId> <artifactId>zookeeper</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> </dependencies> </project>
CuratorFrameworkProperties
类(提供CuratorFramework
需要的一些配置信息,以及创建CuratorFramework
实例的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
distributedBarrierRunnable
类(实现了Runnable
接口,模拟分布式节点等待分布式屏障):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.distributedBarrier;
import java.util.Random;
public class distributedBarrierRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 分布式屏障的路径
String barrierPath = "/kaven";
// 创建distributedBarrier实例,用于提供分布式屏障功能
distributedBarrier barrier = new distributedBarrier(curator, barrierPath);
System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
long start = System.currentTimeMillis();
// 等待屏障被移除
barrier.waitOnBarrier();
System.out.println(Thread.currentThread().getName() + " 等待了 "
+ (System.currentTimeMillis() - start) / 1000 + " s");
System.out.println(Thread.currentThread().getName() + " 继续执行");
}
}
启动类:
package com.kaven.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.distributedBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 分布式屏障的路径
String barrierPath = "/kaven";
// 创建distributedBarrier实例,用于设置和在适当时机删除屏障
distributedBarrier barrier = new distributedBarrier(curator, barrierPath);
// 设置屏障
barrier.setBarrier();
// 分布式节点处理业务
for (int i = 0; i < 5; i++) {
EXECUTOR_SERVICE.execute(new distributedBarrierRunnable());
}
// 模拟移除屏障需要处理的业务
Thread.sleep(20000);
// 移除屏障
barrier.removeBarrier();
}
}
模拟5
个分布式节点等待分布式屏障,输出如下所示:
pool-1-thread-5 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-1 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-2 等待了 19 s
pool-1-thread-2 继续执行
pool-1-thread-5 等待了 19 s
pool-1-thread-5 继续执行
pool-1-thread-3 等待了 19 s
pool-1-thread-3 继续执行
pool-1-thread-4 等待了 19 s
pool-1-thread-4 继续执行
pool-1-thread-1 等待了 19 s
pool-1-thread-1 继续执行
使用提供超时机制的等待屏障方法:
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.distributedBarrier;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class distributedBarrierRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 分布式屏障的路径
String barrierPath = "/kaven";
// 创建distributedBarrier实例,用于提供分布式屏障功能
distributedBarrier barrier = new distributedBarrier(curator, barrierPath);
System.out.println(Thread.currentThread().getName() + " 等待屏障被移除");
long start = System.currentTimeMillis();
// 等待屏障被移除
boolean result = barrier.waitOnBarrier(10, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " 等待了 "
+ (System.currentTimeMillis() - start) / 1000 + " s");
if(result) {
System.out.println(Thread.currentThread().getName() + " 继续执行");
}
else {
// 等待屏障超时
System.out.println(Thread.currentThread().getName() + " 等待屏障超时");
}
}
}
输出如下所示:
pool-1-thread-1 等待屏障被移除
pool-1-thread-2 等待屏障被移除
pool-1-thread-3 等待屏障被移除
pool-1-thread-4 等待屏障被移除
pool-1-thread-5 等待屏障被移除
pool-1-thread-1 等待了 10 s
pool-1-thread-1 等待屏障超时
pool-1-thread-2 等待了 10 s
pool-1-thread-2 等待屏障超时
pool-1-thread-3 等待了 10 s
pool-1-thread-3 等待屏障超时
pool-1-thread-4 等待了 10 s
pool-1-thread-4 等待屏障超时
pool-1-thread-5 等待了 10 s
pool-1-thread-5 等待屏障超时
符合预期。
Curator
框架的分布式屏障distributedBarrier
就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。