JUC并发—4.wait和notify以及Atomic原理二

JUC并发—4.wait和notify以及Atomic原理二

编码文章call10242025-02-21 12:18:198A+A-

大纲

1.wait()与notify()实现一个简易的内存队列

2.wait()与notify()的底层原理

3.分布式存储系统NameNode机制介绍

4.分布式存储系统的edits log机制介绍

5.分布式存储系统的NameNode实现

6.分布式存储系统的创建目录功能的实现

7.edits log的全局txid机制和双缓冲机制实现

8.synchronized实现edits log分段加锁机制

9.wait()与notify()实现edits log批量刷磁盘

10.i++和AtomicInteger分别实现并发安全

11.AtomicInteger中的CAS无锁化原理

12.Atomic源码之仅限JDK使用的Unsafe类

13.Atomic源码之无限重复循环以及CAS操作

14.Atomic原子类基于CAS操作的三大问题

15.AtomicLong优化服务注册中心心跳计数器

16.LongAdder的分段CAS优化多线程自旋

17.LongAdder的分段CAS优化心跳计数器

18.服务注册中心的增量拉取机制

19.AtomicReference优化客户端缓存注册表

20.AtomicStampedReference解决ABA问题

21.AtomicLong多线程拉取注册表版本不错乱


14.Atomic原子类基于CAS操作的三大问题

(1)ABA问题

(2)无限循环问题

(3)多个变量的原子性问题


(1)ABA问题

ABA问题就是:如果某个值一开始是A,后来变成了B,然后又变成了A。AtomicStampedReference原子更新带有版本号的引用类型,解决ABA问题。此外一般用AtomicInteger进行的是不断累加计数,所以ABA问题比较少。


(2)无限循环问题

Atomic原子类设置值的时候会进入一个无限循环,只要不成功就不停循环再次尝试,在高并发修改值时是挺常见的。


比如用AtomicInteger定义一个原子变量,高并发下修改时,可能会导致compareAndSet()要循环很多次才设置成功。所以引入了LongAdder来解决,通过分段CAS的思路来解决无限循环问题


(3)多个变量的原子性问题

一般的AtomicInteger,只能保证一个变量的原子性,但是如果多个变量呢?


要保证多个变量的原子性,可以使用AtomicReference来封装自定义对象。将多个变量放在一个对象里,通过对象的引用来实现多个变量的原子性


15.AtomicLong优化服务注册中心心跳计数器

可以使用AtomicLong来优化服务注册中心内部的心跳计数器。

//心跳请求计数器
public class HeartbeatCounter {
    //单例实例
    private static HeartbeatCounter instance = new HeartbeatCounter();

    //最近一分钟的心跳次数
    private AtomicLong latestMinuteHeartbeatRate = new AtomicLong(0L); 

    //最近一分钟的时间戳
    private long latestMinuteTimestamp = System.currentTimeMillis();

    private HeartbeatCounter() {
        Daemon daemon = new Daemon();
        daemon.setDaemon(true);  
        daemon.start();
    }

    //获取单例实例
    public static HeartbeatCounter getInstance() {
        return instance;
    }

    //增加最近一分钟的心跳次数
    public /**synchronized*/ void increment() {
        //通过synchronized上锁,在很多线程的情况下,性能其实是很差的
        //如果服务实例很多,比如1万个服务实例,那么每秒需要很多线程来处理大量的心跳请求
        //这样就会出现很多线程卡在这里,一个一个排队获取锁,这样就会非常影响并发性能
        //但换成AtomicLong之后,就不用加锁了,通过CAS操作实现无锁化编程,而且还保证了原子性
        latestMinuteHeartbeatRate.incrementAndGet();  
    }

    //获取最近一分钟的心跳次数
    public /**synchronized*/ long get() {
        return latestMinuteHeartbeatRate.get();
    }

    private class Daemon extends Thread {
        @Override
        public void run() {
            while(true) {
                try {
                    long currentTime = System.currentTimeMillis();
                    if (currentTime - latestMinuteTimestamp > 60 * 1000) {
                        latestMinuteHeartbeatRate = new AtomicLong(0L); 
                        latestMinuteTimestamp = System.currentTimeMillis();
                    }
                    Thread.sleep(1000); 
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


16.LongAdder的分段CAS优化多线程自旋

(1)采用分段CAS降低重试频率

(2)通过惰性求值提升自增性能

(3)LongAdder和AtomicLong对比

(4)LongAdder的源码分析

(5)LongAdder的设计总结

(6)伪共享问题说明


(1)采用分段CAS降低重试频率

这种分段的做法类似于JDK7中的ConcurrentHashMap的分段锁。


高并发场景下,value变量其实就是一个热点数据,大量线程竞争一个热点。LongAdder基本思路就是分散热点,将value分散到一个Cell数组中。不同线程会命中数组的不同槽位,各线程只对自己槽位的value进行CAS操作。这样热点就被分散了,冲突概率就变小了。


LongAdder内部有一个base变量和一个Cell[ ]数组。当并发不高的时候都是通过CAS来直接操作base变量的值。如果对base变量的CAS失败,则再针对Cell[ ]数组中的Cell进行CAS操作。如果对Cell[ ]数组中的Cell进行CAS失败,则换一个Cell进行CAS操作。


LongAdder在无竞争情况下,跟AtomicLong是一样的, 对同一个base进行操作。当出现竞争的时候,则采用化整为零分散热点的做法,用空间换时间。通过使用一个Cell[ ]数组,将一个value拆分进这个Cell[ ]数组中。


(2)通过惰性求值提升自增性能

只有在使用longValue()方法获取当前累加值时才会真正去结算计数的数据。LongAdder.longValue()方法其实就是调用LongAdder.sum()方法,LongAdder.sum()方法会将Cell数组中的各元素value和base累加作为返回值。



AtomicLong.incrementAndGet()方法每次都会返回long类型的计数值,每次递增后还会伴随着数据返回,增加了额外的开销。


(3)LongAdder和AtomicLong对比

一.AtomicLong总结

AtomicLong的实现原理是:

基于CAS + 自旋操作,CAS是基于硬件来实现原子性的,可以保障线程安全。


AtomicLong的使用场景:

低并发下的全局计数器、序列号生成器


AtomicLong的优势是:

占用空间小


AtomicLong的缺点是:

高并发下性能急剧下降,N个线程同时进行自旋,N-1个线程会自旋失败、不断重试。


二.LongAdder总结

LongAdder设计思想是:

空间换时间分散热点数据value的值


LongAdder的实现原理是:

高并发时通过Cell[ ]数组进行分段CAS


LongAdder的使用场景是:

高并发下的全局计数器


LongAdder的优势是:

减少CAS重试次数防止伪共享惰性求值


LongAdder的缺点是:

如果使用它的sum()方法时有并发更新,可能数据结果存在误差


(4)LongAdder的源码分析

//并发不高的时候,直接更新base,类似AtomicLong;
//高并发的时候,将每个线程的操作hash到不同的cells数组中;
//从而将AtomicLong中更新一个value的行为优化之后,分散到多个value中,从而降低更新热点
//而需要得到当前值时,直接将所有cell中的value与base相加即可;
//但是和AtomicLong的CAS不同,incrementAndGet操作及其变种可以返回更新后的值,
//而LongAdder的更新操作返回的是void
public class LongAdder extends Striped64 implements Serializable {
    //入参x是累加值
    public void add(long x) {
        //as是累加单元数组cells的引用
        Cell[] as;
        //b是指获取的base值,v是指期望值(当前Cell存储的值)
        long b, v;
        //m是cells数组的长度
        int m;
        //当前线程命中的cells数组元素Cell对象
        Cell a;
        //如果是第一次执行,则直接case操作base
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            //as数组为空(null或者size为0) 或者 当前线程取模as数组大小为空 或者 cas更新Cell失败
            if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) {
                longAccumulate(x, null, uncontended);
            }
        }
    }

    public long sum() {
        //通过累加base与cells数组中的value从而获得sum
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                }
            }
        }
        return sum;
    }
}

abstract class Striped64 extends Number {
    //@Contended是防止缓存行伪共享的注解
    //CPU缓存是以缓存行为单位的,每个缓存行对应着一块内存,一般是64字节(8个long)
    //Cell即为累加单元
    @sun.misc.Contended static final class Cell {
        //保存累加结果
        volatile long value;
      
        //构造方法中会初始化value值
        Cell(long x) {
            value = x;
        }
      
        //使用CAS方式进行累加,cmp表示旧值,val表示新值
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset(sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset(sk.getDeclaredField("cellsBusy"));
            Class tk = Thread.class;
            //返回Field在内存中相对于对象内存地址的偏移量
            PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    //CPU数量,即cells数组的最大长度
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    //cells数组,大小为2的幂,这里的Cell是Striped64的静态内部类
    transient volatile Cell[] cells;

    //在没有竞争的情况下,将操作值累到base中
    //在cells数组初始化过程中,cells数组还不可用,这时候也会通过CAS将操作值累到base中
    transient volatile long base;

    //cellsBusy有两个值0和1,它的作用是当要修改cells数组时加锁,防止多线程同时修改cells数组
    //加锁的情况有三种:一.cells数组初始化的时候;二.cells数组扩容的时候;
    //三.如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;
    transient volatile int cellsBusy;

    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        //存储线程的hash值,有了hash值旧可以知道当前线程进入哪个槽位
        int h;
        //如果getProbe()为0,说明随机数未初始化,需要初始化后,线程才能进入对应槽位
        if ((h = getProbe()) == 0) {
            //使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
            ThreadLocalRandom.current(); // force initialization
            //重新获取hash值
            h = getProbe();
            //重新获取hash值后,认为此次不算一次竞争,所以wasUncontended表示的是否竞争状态为true
            wasUncontended = true;
        }
        boolean collide = false;

        //失败重试
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                //若as数组已经初始化,(n-1) & h 即为取模操作,相对 % 效率要更高
                if ((a = as[(n - 1) & h]) == null) {
                    //其他线程没有使用,自然就没有加锁
                    if (cellsBusy == 0) {
                        //创建累加单元,还没赋值到cells数组中
                        Cell r = new Cell(x);

                        //可能会有多个线程执行了"new Cell(x)",因此需要进行CAS操作,避免线程安全的问题
                        //同时需要再判断一次,避免正在初始化的时其他线程再进行额外的CAS操作
                        //这里的if条件是将创建的累加单元,设置到cells数组的空位置(cells[0]或cells[1])
                        //双重检查cellsBusy == 0,避免并发场景下重复赋值
                        //进入该if条件之前通过casCellsBusy()尝试加锁,保证赋值时是线程安全的
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {
                                Cell[] rs; int m, j;
                                //重新检查一下是否已经创建成功了
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;//赋值到空槽位
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;//解锁
                            }
                            if (created) {
                                break;//退出
                            }
                            //槽位现在是非空了,continue到下次循环重试
                            continue;
                        }
                    }
                    collide = false;
                } else if (!wasUncontended) {
                    //wasUncontended为false说明在同一个槽位竞争失败
                    wasUncontended = true;
                } else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {
                    //尝试对已经有值的单元Cell进行累加,成功则退出
                    break;//若CAS更新成功则跳出循环,否则继续重试
                } else if (n >= NCPU || cells != as) {
                    //累加失败,判断是否超过CPU上限
                    //超过CPU上限后,设置collide为false,为了让下次循环进入下一个条件,防止进行扩容
                    collide = false;
                } else if (!collide) {
                    collide = true;
                } else if (cellsBusy == 0 && casCellsBusy()) {
                    //其他线程没加锁,当前线程进入时再自己加锁
                    try {
                        //对cells进行扩容
                        if (cells == as) {
                            //每次扩容2倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i) {
                                //将旧数组拷贝到新数组中
                                rs[i] = as[i];
                            }
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;//解锁
                    }
                    collide = false;
                    //重新找槽位
                    continue;
                }

                //执行到这一步说明,前面的步骤都没成功,需要尝试换一个累加单元Cell进行累加
                h = advanceProbe(h);
            } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {
                    //再次检查cells引用是否改变,双重检查是为了避免并发场景下重复创建cells
                    if (cells == as) {
                        Cell[] rs = new Cell[2];//创建长度为2的cells数组
                        rs[h & 1] = new Cell(x);//将累加值x随机存放到cell数组对应的索引下标位置
                        cells = rs;//再将创建的cell数组引用赋值到cells
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;//创建完cells数组后,解锁
                }
                if (init) {
                    break;
                }
            } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {//尝试CAS累加
                //若已经有另一个线程在初始化,那么尝试直接更新base
                break;
            }
        }
    }

    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

    static final int getProbe() {
        //通过Unsafe获取Thread中threadLocalRandomProbe的值
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    ...
}

longAccumulate()方法的流程图:

(5)LongAdder的设计总结

一.分段CAS机制

把一个变量拆成多份变成多个变量,类似JDK 1.7的ConcurrentHashMap的分段锁。具体来说就是把一个Long型变量拆成一个base变量外加多个Cell变量,每个Cell变量包装了一个Long型变量。当多个线程并发累加时,如果并发度低就直接加到base变量上,如果并发度高分散到Cell变量上。在最后取值时,再把base变量和这些Cell变量进行累加求和运算。


LongAddr只能进行累加操作,并且初始值默认为0。LongAccumulator可以自定义一个二元操作符,而且可以传入一个初始值。


二.最终一致性

LongAddr的sum()方法并没有对cell[ ]数组加锁,所以存在一边有线程对cell[ ]数组求和、一边有线程修改数组的情况。类似于ConcurrentHashMap的clear()方法,一边清空数据一边放入数据。


三.伪共享与缓存行填充

LongAddr在定义Cell时,使用了注解@Contended。这个注解可以用来进行缓存行填充,从而解决伪共享问题


四.数组扩容

Cell[ ]数组的大小始终是2的整数次方,每次扩容都变为原来的2倍。


(6)伪共享问题说明

每个CPU都有自己的缓存,也就是高速缓存。CPU缓存与主内存进行数据交换的基本单位叫缓存行。CPU缓存由若干个缓存行组成的,缓存行是CPU缓存的最小存储单位。在64位的x86架构中,每个缓存行是64字节,也就是8个Long型的大小。当CPU的缓存失效了需要从主内存刷新数据时,至少需要刷新64字节。


假设主内存的Long型变量X、Y已被CPU1和CPU2分别读入自己的缓存,且Long型变量X、Y在CPU缓存和主内存中都是放在同一行缓存行中的。这样当CPU1修改了变量X,需要失效整个缓存行时,就会往总线发送消息,通知CPU2对应的缓存行失效。所以虽然CPU2并没有修改变量Y,但也需要刷新变量Y所在的缓存行。这就是伪共享问题,缓存行上的不同变量,读CPU受到写CPU的影响


17.LongAdder的分段CAS优化心跳计数器

使用LongAdder替代AtomicLong:

//private AtomicLong latestMinuteHeartbeatRate = new AtomicLong(0L); 
private LongAdder latestMinuteHeartbeatRate = new LongAdder(); 
...
//latestMinuteHeartbeatRate.incrementAndGet();  
latestMinuteHeartbeatRate.increment();
...
//return latestMinuteHeartbeatRate.get();
return latestMinuteHeartbeatRate.longValue();
...
//latestMinuteHeartbeatRate = new AtomicLong(0L); 
latestMinuteHeartbeatRate = new LongAdder();


18.服务注册中心的增量拉取机制

(1)服务注册中心的增量拉取机制

(2)增量拉取服务注册表的实现

(3)提供全量和增量拉取注册表的接口

(4)客户端启动时拉取全量注册表

(5)客户端定时拉取增量注册表到本地

(6)客户端增量合并注册表后的校验与全量纠正


(1)服务注册中心的增量拉取机制

由于服务注册表的每一条数据并不是都会变化的,每隔30秒可能只有少数几个服务实例的数据会出现变化,所以并不需要每隔30秒就全量拉取服务注册表的所有数据。


否则,如果服务实例有几万个,那么服务注册表里对应有几万条数据。每30秒拉取几万条数据,将对网络开销、注册中心的性能产生巨大压力。


注册中心启动时,会先全量拉取一次服务注册表,然后每隔30秒增量拉取一次服务注册表。所以每隔30秒拉取最近30秒变化的少量服务实例信息即可。


(2)增量拉取服务注册表的实现

可以使用一个队列,队列里存放的就是最近3分钟有变化的服务实例

//服务注册表
public class ServiceRegistry {
    public static final Long RECENTLY_CHANGED_ITEM_CHECK_INTERVAL = 3000L;
    public static final Long RECENTLY_CHANGED_ITEM_EXPIRED = 3 * 60 * 1000L;

    //注册表是一个单例
    private static ServiceRegistry instance = new ServiceRegistry();

    //核心的内存数据结构:注册表,Map:key是服务名称,value是这个服务的所有的服务实例
    private Map> registry = new HashMap>();
    
		//最近变更的服务实例的队列  
    private LinkedList recentlyChangedQueue = new LinkedList();
    
    //获取服务注册表的单例实例
    public static ServiceRegistry getInstance() {
        return instance;
    }
    
    //构造函数
    private ServiceRegistry() {
        //启动后台线程监控最近变更的队列
        RecentlyChangedQueueMonitor recentlyChangedQueueMonitor = new RecentlyChangedQueueMonitor();
        recentlyChangedQueueMonitor.setDaemon(true); 
        recentlyChangedQueueMonitor.start();
    }
    
    //服务注册
    public synchronized void register(ServiceInstance serviceInstance) {
        //将服务实例放入最近变更的队列中
        RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REGISTER);
        recentlyChangedQueue.offer(recentlyChangedItem);
        //将服务实例放入注册表中
        Map serviceInstanceMap = registry.get(serviceInstance.getServiceName());
        if (serviceInstanceMap == null) {
            serviceInstanceMap = new HashMap();
            registry.put(serviceInstance.getServiceName(), serviceInstanceMap);
        }
        serviceInstanceMap.put(serviceInstance.getServiceInstanceId(), serviceInstance);
        System.out.println("服务实例,完成注册......【" + serviceInstance + "】");  
        System.out.println("注册表:" + registry); 
    }
    
    //获取服务实例
    public synchronized ServiceInstance getServiceInstance(String serviceName, String serviceInstanceId) {
        Map serviceInstanceMap = registry.get(serviceName);
        return serviceInstanceMap.get(serviceInstanceId);
    }
    
    //获取整个注册表
    public synchronized Map> getRegistry() {
        return registry;
    }
    
    //从注册表删除一个服务实例
    public synchronized void remove(String serviceName, String serviceInstanceId) {
        System.out.println("服务实例从注册表中摘除[" + serviceName + ", " + serviceInstanceId + "]");
        //获取服务实例
        Map serviceInstanceMap = registry.get(serviceName);
        ServiceInstance serviceInstance = serviceInstanceMap.get(serviceInstanceId);
        //将服务实例变更信息放入队列中
        RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), ServiceInstanceOperation.REMOVE);
        recentlyChangedQueue.offer(recentlyChangedItem);
        //从服务注册表删除服务实例
        serviceInstanceMap.remove(serviceInstanceId);
    }
        
    //最近变化的服务实例-内部类
    class RecentlyChangedServiceInstance {
        //服务实例
        ServiceInstance serviceInstance;
        //发生变更的时间戳
        Long changedTimestamp;
        //变更操作
        String serviceInstanceOperation;
        public RecentlyChangedServiceInstance(ServiceInstance serviceInstance, Long changedTimestamp, String serviceInstanceOperation) {
            this.serviceInstance = serviceInstance;
            this.changedTimestamp = changedTimestamp;
            this.serviceInstanceOperation = serviceInstanceOperation;
        }
    }
    
    //服务实例操作-内部类
    class ServiceInstanceOperation {
        public static final String REGISTER = "register";//注册
        public static final String REMOVE = "REMOVE";//删除
    }
    
    //最近变更队列的监控线程-内部类
    class RecentlyChangedQueueMonitor extends Thread {
        @Override
        public void run() {
            while(true) {
                try {
                    //和remove与register锁的都是注册表instance实例
                    synchronized(instance) {
                        RecentlyChangedServiceInstance recentlyChangedItem = null;
                        Long currentTimestamp = System.currentTimeMillis();
                        while ((recentlyChangedItem = recentlyChangedQueue.peek()) != null) {
                            //判断如果一个服务实例变更信息已经在队列里存在超过3分钟了,就从队列中移除
                            if (currentTimestamp - recentlyChangedItem.changedTimestamp > RECENTLY_CHANGED_ITEM_EXPIRED) {
                                recentlyChangedQueue.pop();
                            }
                        }
                    }
                    Thread.sleep(RECENTLY_CHANGED_ITEM_CHECK_INTERVAL);
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        }
    }
}

(3)提供全量和增量拉取注册表的接口

//负责接收register-client发送过来的请求的
public class RegisterServerController {
    private ServiceRegistry registry = ServiceRegistry.getInstance();
    
    //服务注册
    public RegisterResponse register(RegisterRequest registerRequest) {
        ...
    }
     
    //发送心跳
    public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { 
        ...
    }
    
    //拉取全量注册表
    public Map> fetchFullServiceRegistry() {
        return registry.getRegistry();
    }
    
    //拉取增量注册表
    public LinkedList fetchDeltaServiceRegistry() {
        return registry.getRecentlyChangedQueue();
    }
    
    //服务下线
    public void cancel(String serviceName, String serviceInstanceId) {
        ...
    }
}

public class ServiceRegistry {
    ...
    //获取整个注册表
    public synchronized Map> getRegistry() {
        return registry;
    }
      
    //获取最近有变化的注册表
    public synchronized LinkedList getRecentlyChangedQueue() {
        return recentlyChangedQueue;
    }
    ...
}

(4)客户端启动时拉取全量注册表

//服务注册中心的客户端缓存的一个服务注册表
public class CachedServiceRegistry {
    ...
    //负责定时拉取注册表到客户端进行缓存的后台线程
    private FetchDeltaRegistryWorker fetchDeltaRegistryWorker;

    ...
    //构造函数
    public CachedServiceRegistry(RegisterClient registerClient, HttpSender httpSender) {
        this.fetchDeltaRegistryWorker = new FetchDeltaRegistryWorker();
        this.registerClient = registerClient;
        this.httpSender = httpSender;
    }

    //初始化
    public void initialize() {
        //启动全量拉取注册表的线程
        FetchFullRegistryWorker fetchFullRegistryWorker = new FetchFullRegistryWorker();
        fetchFullRegistryWorker.start();  
        //启动增量拉取注册表的线程
        this.fetchDeltaRegistryWorker.start();
    }

    //销毁这个组件
    public void destroy() {
        this.fetchDeltaRegistryWorker.interrupt();
    }

    //全量拉取注册表的后台线程
    private class FetchFullRegistryWorker extends Thread {
        @Override
        public void run() {
            registry = httpSender.fetchServiceRegistry();
        }
    }

    //增量拉取注册表的后台线程
    private class FetchDeltaRegistryWorker extends Thread {
        @Override
        public void run() {
            while(registerClient.isRunning()) {  
                try {
                    ...
                    Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL);  
                } catch (Exception e) {
                    e.printStackTrace();  
                }
            }
        }
    }
    ...
}

//负责发送各种http请求的组件
public class HttpSender {
    ...
    //全量拉取服务注册表
    public Map> fetchServiceRegistry() {
        Map> registry = new HashMap>();
        ServiceInstance serviceInstance = new ServiceInstance();
        serviceInstance.setHostname("finance-service-01");  
        serviceInstance.setIp("192.168.31.1207");  
        serviceInstance.setPort(9000);  
        serviceInstance.setServiceInstanceId("FINANCE-SERVICE-192.168.31.207:9000");  
        serviceInstance.setServiceName("FINANCE-SERVICE");
        Map serviceInstances = new HashMap();
        serviceInstances.put("FINANCE-SERVICE-192.168.31.207:9000", serviceInstance);
        registry.put("FINANCE-SERVICE", serviceInstances);
        System.out.println("拉取注册表:" + registry);
        return registry;
    }
    
    //增量拉取服务注册表
    public LinkedList fetchDeltaServiceRegistry() {
        LinkedList recentlyChangedQueue = new LinkedList();
        ServiceInstance serviceInstance = new ServiceInstance();
        serviceInstance.setHostname("order-service-01");  
        serviceInstance.setIp("192.168.31.288");  
        serviceInstance.setPort(9000);  
        serviceInstance.setServiceInstanceId("ORDER-SERVICE-192.168.31.288:9000");  
        serviceInstance.setServiceName("ORDER-SERVICE");
        RecentlyChangedServiceInstance recentlyChangedItem = new RecentlyChangedServiceInstance(serviceInstance, System.currentTimeMillis(), "register");
        recentlyChangedQueue.add(recentlyChangedItem);
        System.out.println("拉取增量注册表:" + recentlyChangedQueue);
        return recentlyChangedQueue;
    }
    ...
}

(5)客户端定时拉取增量注册表到本地

//服务注册中心的客户端缓存的一个服务注册表
public class CachedServiceRegistry {
    ...
    //增量拉取注册表的后台线程
    private class FetchDeltaRegistryWorker extends Thread {
        @Override
        public void run() {
            while(registerClient.isRunning()) {  
                try {
                    //拉取回来的是最近3分钟变化的服务实例
                    Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL);
                    LinkedList deltaRegistry = httpSender.fetchDeltaServiceRegistry();
               
                    //增量信息有两类:一类是注册,一类是删除
                    //如果是注册信息,就判断一下这个服务实例是否在这个本地缓存的注册表中,如果不在就放到本地缓存注册表里
                    //如果是删除信息,就看服务实例是否存在,存在就删除
                    //这里会大量修改本地缓存的注册表,所以需要加锁
                    synchronized(registry) {
                        mergeDeltaRegistry(deltaRegistry);
                    }
                } catch (Exception e) {
                    e.printStackTrace();  
                }
            }
        }
      
        //合并增量注册表到本地缓存注册表
        private void mergeDeltaRegistry(LinkedList deltaRegistry) {
            for (RecentlyChangedServiceInstance recentlyChangedItem : deltaRegistry) {
                //如果是注册操作
                if (ServiceInstanceOperation.REGISTER.equals(recentlyChangedItem.serviceInstanceOperation)) {
                    Map serviceInstanceMap = registry.get(recentlyChangedItem.serviceInstance.getServiceName());
                    if (serviceInstanceMap == null) {
                        serviceInstanceMap = new HashMap();
                        registry.put(recentlyChangedItem.serviceInstance.getServiceName(), serviceInstanceMap);
                    }
                    ServiceInstance serviceInstance = serviceInstanceMap.get(recentlyChangedItem.serviceInstance.getServiceInstanceId());
                    if (serviceInstance == null) {
                        serviceInstanceMap.put(recentlyChangedItem.serviceInstance.getServiceInstanceId(), recentlyChangedItem.serviceInstance);
                    }
                }
                //如果是删除操作
                else if (ServiceInstanceOperation.REMOVE.equals(recentlyChangedItem.serviceInstanceOperation)) {
                    Map serviceInstanceMap = registry.get(recentlyChangedItem.serviceInstance.getServiceName());
                    if (serviceInstanceMap != null) {
                        serviceInstanceMap.remove(recentlyChangedItem.serviceInstance.getServiceInstanceId());
                    }
                }
            }
        }
    }
    ...
}

(6)客户端增量合并注册表后的校验与全量纠正

//增量拉取注册表的后台线程
private class FetchDeltaRegistryWorker extends Thread {
    @Override
    public void run() {
        while(registerClient.isRunning()) {  
            try {
                Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL); 
            
                //拉取回来的是最近3分钟变化的服务实例
                DeltaRegistry deltaRegistry = httpSender.fetchDeltaRegistry();
            
                //一类是注册,一类是删除
                //如果是注册,就判断这个服务实例是否在本地缓存的注册表中
                //如果不在,就放到本地缓存注册表里
                //如果是删除,且服务实例还存在,那么就进行删除
                //这里会大量修改本地缓存的注册表,所以需要加锁
                synchronized(registry) {
                    mergeDeltaRegistry(deltaRegistry.getRecentlyChangedQueue()); 
                }
            
                //再检查一下,跟服务端的注册表的服务实例相比,数量是否是一致
                //封装增量注册表的对象,也就是拉取增量注册表时,
                //一方面要返回那个数据,另外一方面要那个对应的register-server端的服务实例的数量
                Long serverSideTotalCount = deltaRegistry.getServiceInstanceTotalCount();
            
                Long clientSideTotalCount = 0L;
                for (Map serviceInstanceMap : registry.values()) {
                    clientSideTotalCount += serviceInstanceMap.size();
                }
            
                if (serverSideTotalCount != clientSideTotalCount) {
                    //重新拉取全量注册表进行纠正
                    registry = httpSender.fetchFullRegistry();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    ...
}


19.AtomicReference优化客户端缓存注册表

多个线程同时对缓存的注册表信息进行修改时,必然存在并发冲突问题,此时可用AtomicReference的CAS操作来替代使用加重量级锁

//完整的服务实例的信息
public class Applications {
    private Map> registry = new HashMap>();
    
		public Applications() {
        
    }

    public Applications(Map> registry) {
        this.registry = registry;
    }

    public Map> getRegistry() {
        return registry;
    }

    public void setRegistry(Map> registry) {
        this.registry = registry;
    }
}


//服务注册中心的客户端缓存的一个服务注册表
public class CachedServiceRegistry {
    //服务注册表拉取间隔时间
    private static final Long SERVICE_REGISTRY_FETCH_INTERVAL = 30 * 1000L;

    //客户端缓存的所有的服务实例的信息
    private AtomicReference applications = new AtomicReference(new Applications());
    
		//负责定时拉取注册表到客户端进行缓存的后台线程
    private FetchDeltaRegistryWorker fetchDeltaRegistryWorker;

    //RegisterClient
    private RegisterClient registerClient;

    //http通信组件
    private HttpSender httpSender;
    
    //构造函数
    public CachedServiceRegistry(RegisterClient registerClient, HttpSender httpSender) {
        this.fetchDeltaRegistryWorker = new FetchDeltaRegistryWorker();
        this.registerClient = registerClient;
        this.httpSender = httpSender;
    }
    
    //初始化
    public void initialize() {
        //启动全量拉取注册表的线程
        FetchFullRegistryWorker fetchFullRegistryWorker = new FetchFullRegistryWorker();
        fetchFullRegistryWorker.start();  
        //启动增量拉取注册表的线程
        this.fetchDeltaRegistryWorker.start();
    }
    
    //销毁这个组件
    public void destroy() {
        this.fetchDeltaRegistryWorker.interrupt();
    }
    
    //全量拉取注册表的后台线程
    private class FetchFullRegistryWorker extends Thread {
        @Override
        public void run() {
            //拉取全量注册表
            Applications fetchedApplications = httpSender.fetchFullRegistry();
            while (true) {
                Applications expectedApplications = applications.get();
                if (applications.compareAndSet(expectedApplications, fetchedApplications)) {
                    break;
                }
            }
        }
    }
    
    //增量拉取注册表的后台线程
    private class FetchDeltaRegistryWorker extends Thread {
        @Override
        public void run() {
            while (registerClient.isRunning()) {
                try {
                    Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL);
                    //拉取回来的是最近3分钟变化的服务实例
                    DeltaRegistry deltaRegistry = httpSender.fetchDeltaRegistry();

                    //一类是注册,一类是删除
                    //如果是注册,就判断这个服务实例是否在这个本地缓存的注册表中
                    //如果不在,就放到本地缓存注册表里
                    //如果是删除,且服务实例还存在,就进行删除
                    mergeDeltaRegistry(deltaRegistry);

                    //再检查一下,跟服务端的注册表的服务实例相比,数量是否一致
                    //封装增量注册表的对象,也就是拉取增量注册表时,
                    //一方面要返回那个数据,另外一方面要那个对应的register-server端的服务实例的数量
                    reconcileRegistry(deltaRegistry);  
                } catch (Exception e) {
                    e.printStackTrace();  
                }
            }
        }
      
        //合并增量注册表到本地缓存注册表里去
        private void mergeDeltaRegistry(DeltaRegistry deltaRegistry) {
            synchronized(applications) {
                Map> registry = applications.get().getRegistry();
                LinkedList recentlyChangedQueue = deltaRegistry.getRecentlyChangedQueue();
                for (RecentlyChangedServiceInstance recentlyChangedItem : recentlyChangedQueue) {
                    String serviceName = recentlyChangedItem.serviceInstance.getServiceName();
                    String serviceInstanceId = recentlyChangedItem.serviceInstance.getServiceInstanceId();
                    //如果是注册操作
                    if (ServiceInstanceOperation.REGISTER.equals(recentlyChangedItem.serviceInstanceOperation)) {
                        Map serviceInstanceMap = registry.get(serviceName);
                        if (serviceInstanceMap == null) {
                            serviceInstanceMap = new HashMap();
                            registry.put(serviceName, serviceInstanceMap);
                        }
                        ServiceInstance serviceInstance = serviceInstanceMap.get(serviceInstanceId);
                        if (serviceInstance == null) {
                            serviceInstanceMap.put(serviceInstanceId, recentlyChangedItem.serviceInstance);
                        }
                    }
                    //如果是删除操作
                    else if (ServiceInstanceOperation.REMOVE.equals(recentlyChangedItem.serviceInstanceOperation)) {
                        Map serviceInstanceMap = registry.get(serviceName);
                        if (serviceInstanceMap != null) {
                            serviceInstanceMap.remove(serviceInstanceId); 
                        }
                    }
                }
            }
        }
      
        //校对调整注册表
        private void reconcileRegistry(DeltaRegistry deltaRegistry) {
            Map> registry = applications.get().getRegistry();
            Long serverSideTotalCount = deltaRegistry.getServiceInstanceTotalCount();
            Long clientSideTotalCount = 0L;
            for (Map serviceInstanceMap : registry.values()) {
                clientSideTotalCount += serviceInstanceMap.size(); 
            }
         
            if (serverSideTotalCount != clientSideTotalCount) {
                //重新拉取全量注册表进行纠正
                Applications fetchedApplications = httpSender.fetchFullRegistry();
                while(true) {
                    Applications expectedApplications = applications.get();
                    if (applications.compareAndSet(expectedApplications, fetchedApplications)) {
                        break;
                    }
                } 
            }
        }
    }

    //服务实例操作
    class ServiceInstanceOperation {
        public static final String REGISTER = "register";//注册
        public static final String REMOVE = "REMOVE";//删除
    }
    
    //获取服务注册表
    public Map> getRegistry() {
        return applications.get().getRegistry();
    } 

    //最近变更的实例信息
    static class RecentlyChangedServiceInstance {
        //服务实例
        ServiceInstance serviceInstance;
      
        //发生变更的时间戳
        Long changedTimestamp;
      
        //变更操作
        String serviceInstanceOperation;
      
        public RecentlyChangedServiceInstance(ServiceInstance serviceInstance, Long changedTimestamp, String serviceInstanceOperation) {
            this.serviceInstance = serviceInstance;
            this.changedTimestamp = changedTimestamp;
            this.serviceInstanceOperation = serviceInstanceOperation;
        }
      
        @Override
        public String toString() {
            return"RecentlyChangedServiceInstance [serviceInstance=" + serviceInstance + ", changedTimestamp=" + changedTimestamp + ", serviceInstanceOperation=" + serviceInstanceOperation + "]";
        }
    }
}


20.AtomicStampedReference解决ABA问题

public class CachedServiceRegistry {
    ...
    //客户端缓存的所有的服务实例的信息
    private AtomicStampedReference applications;
    ...
    //构造函数
    public CachedServiceRegistry(RegisterClient registerClient, HttpSender httpSender) {
        this.fetchDeltaRegistryWorker = new FetchDeltaRegistryWorker();
        this.registerClient = registerClient;
        this.httpSender = httpSender;
        this.applications = new AtomicStampedReference(new Applications(), 0);  
    }
    ...
    //全量拉取注册表的后台线程
    private class FetchFullRegistryWorker extends Thread {
        @Override
        public void run() {
            //拉取全量注册表
            Applications fetchedApplications = httpSender.fetchFullRegistry();
            while(true) {
                Applications expectedApplications = applications.getReference();
                int expectedStamp = applications.getStamp();
                if (applications.compareAndSet(expectedApplications, fetchedApplications, expectedStamp, expectedStamp + 1)) {
                    break;
                }
            }
        }
    }
    ...
}


21.AtomicLong多线程拉取注册表版本不错乱

(1)发生版本错乱问题的情况

(2)AtomicLong解决注册表版本错乱问题


前面使用AtomicReference来解决多线程并发赋值时原子性问题,下面使用AtomicLong来解决多线程并发拉注册表时可能的版本混乱问题


(1)发生版本错乱问题的情况

public class CachedServiceRegistry {
    ...
    //全量拉取注册表的后台线程
    private class FetchFullRegistryWorker extends Thread {
        @Override
        public void run() {
            //拉取全量注册表操作需要通过网络完成,但是可能网络异常一直卡住,导致该请求的数据没有返回
            //卡了几分钟后,此时客户端已经缓存了很多服务实例,总服务实例已达40个
            //但该请求却可以返回了,而返回的数据却已经成为旧版本了,里面仅包含30个服务实例
            //该请求对应的全量拉取注册表线程被唤醒后,将30个服务实例的旧版本数据赋值给本地缓存注册表
            //于是便发生了版本错乱问题
           
            //所以在发起网络请求前,需要先拿到一个当时的版本号
            fetchFullRegistry();
        }
    }
    ...
}

(2)AtomicLong解决注册表版本错乱问题

public class CachedServiceRegistry {
    ...
    //代表当前本地缓存的服务注册表的一个版本号
    private AtomicLong applicationsVersion = new AtomicLong(0L);
    ...
    //拉取全量注册表到本地
    private void fetchFullRegistry() {
        Long expectedVersion = applicationsVersion.get(); // version = 0
        Applications fetchedApplications = httpSender.fetchFullRegistry();
        if (applicationsVersion.compareAndSet(expectedVersion, expectedVersion + 1)) {
            while (true) {
                Applications expectedApplications = applications.getReference();
                int expectedStamp = applications.getStamp();
                if (applications.compareAndSet(expectedApplications, fetchedApplications, expectedStamp, expectedStamp + 1)) {
                    break;
                }
            }
        }
    }

    //增量拉取注册表的后台线程  
    private class FetchDeltaRegistryWorker extends Thread {
        @Override
        public void run() {
            while(registerClient.isRunning()) {  
                try {
                    Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL); 
                    Long expectedVersion = applicationsVersion.get();
                    if (applicationsVersion.compareAndSet(expectedVersion, expectedVersion + 1)) {
                        DeltaRegistry deltaRegistry = httpSender.fetchDeltaRegistry();
                        mergeDeltaRegistry(deltaRegistry); 
                        //和服务端的注册表的服务实例的数量相比是否一致
                        reconcileRegistry(deltaRegistry);  
                    }
                } catch (Exception e) {
                    e.printStackTrace();  
                }
            }
        }
        ...
    }
    ...
}
点击这里复制本文地址 以上内容由文彬编程网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

文彬编程网 © All Rights Reserved.  蜀ICP备2024111239号-4