微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

ZooKeeper5 分布式锁

概念

  • 在我们进行单机应用开发,级并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。

  • 但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间E经无法通过多线程的锁解决同步问题。

  • 那么就需要一种更加高级的锁机制, 来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。

  • 资源在哪里,分布式锁就加在那里

分布式锁是干什么的:跨机器的进程之间的数据同步问题

image-20211024163854548

Zookeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

image-20211024164727170

步骤:

  1. 客户端获取锁时,在lock节点(这个节点是随便创建的)下创建临时顺序节点。
    • 临时:无论如何都要释放,所以是临时的,避免宕机删除不了这个节点
    • 顺序:确定创建节点的时间顺序
  2. 然后客户端获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁使用完锁后,将该节点删除
  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到此自己小的那个节点,同时对其注册事件监听器,监听删除事件。(只需要找到一个比自己小的就可以了)
  4. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听

Curator实现分布式锁API

在Curator中有五种锁方案:

  • InterProcessSemaphoreMutex: 分布式排它锁(非可重入锁)
    • 前面进入过一次
    • 非可重入:如果客户端获取到共享资源过后,如果是非可重入的,如果再想要再次进入这个资源的时候,必须把锁释放,才能够进去,再跟其他竞争这个锁资源
    • 可重入:进去一次过后想要再进去,再进去就可以了
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultilock: 移个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2: 共享信号量

模拟12306售票案例

image-20211024165741079

  • 通过线程去实现客户端

    public class LockTest {
    
    
        public static void main(String[] args) {
            Ticket12306 ticket12306 = new Ticket12306();
    
            //创建客户端
            Thread t1 = new Thread(ticket12306,"携程");
            Thread t2 = new Thread(ticket12306,"飞猪");
    
            t1.start();
            t2.start();
        }
    
    }
    
  • 分布式锁的实现

    public class Ticket12306 implements Runnable{
    
        private int tickets = 10;//数据库的票数
        //分布式锁实现的一个类
        private InterProcessMutex lock ;
    
    
        public Ticket12306(){
            //重试策略
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
            //2.第二种方式
            //CuratorFrameworkFactory.builder();
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("192.168.149.135:2181")
                    .sessionTimeoutMs(60 * 1000)
                    .connectionTimeoutMs(15 * 1000)
                    .retryPolicy(retryPolicy)
                    .build();
    
            //开启连接
            client.start();
            //创建锁,初始化锁,lock这个节点会自动创建
            lock = new InterProcessMutex(client,"/lock");
        }
    
        @Override
        public void run() {
    
            while(true){
                //获取锁
                try {
                    lock.acquire(3, TimeUnit.SECONDS);//加锁
                    if(tickets > 0){
    
                        System.out.println(Thread.currentThread()+":"+tickets);
                        Thread.sleep(100);
                        tickets--;
                    }
                } catch (Exception e) {
                    e.printstacktrace();
                }finally {
                    //释放锁
                    try {
                        lock.release();
                    } catch (Exception e) {
                        e.printstacktrace();
                    }
                }
    
    
    
            }
    
        }
    }
    

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐