总结:Java届很难得有读百十行代码就能增加修炼的读懂代码多线机会,这里有一个。百行 通常,读懂代码多线我在看书的百行时候一般不写代码,因为我的读懂代码多线脑袋被设定成单线程的,一旦同时喂给它不同的百行信息,它就无法处理。读懂代码多线 但多线程对电脑来说就是百行小菜一碟,它可以同时做很多事,读懂代码多线看起来匪夷所思。百行好希望把自己的读懂代码多线大脑皮层移植到这些牛x的设备上。 用人脑思考电脑正在思考的百行问题,这本身就是读懂代码多线一种折磨。但平常的百行工作和面试中,又不得不面对这样的读懂代码多线场景,所以多线程就成了编程路上一块难啃的骨头。 HikariCP是SpringBoot默认的数据库连接池,它毫不谦虚的的网站模板起了一个叫做光的名字,这让国产Druid很没面子。 还是言归正传,看一下Hikari中的ConcurrentBag吧。 多线程代码一个让人比较头疼的问题,就是每个API我都懂,但就是不会用。很多对concurrent包倒背如流的同学,在面对现实的问题时,到最后依然不得不被迫加上Lock或者synchronized。 ConcurrentBag是一个Lock free的数据结构,主要用作数据库连接的存储,可以说整个HikariCP的核心就是它。删掉乱七八糟的注释和异常处理,可以说关键的代码也就百十来行,但里面的道道却非常的多。站群服务器 ConcurrentBag速度很快,要达到这个目标,就需要一定的核心数据结构支持。 private final CopyOnWriteArrayListsharedList; private final ThreadLocal private final AtomicInteger waiters; ConcurrentBag里面的元素,为了能够无锁化操作,需要使用一些变量来标识现在处于的状态。抽象的云服务器接口如下: public interface IConcurrentBagEntry{ int STATE_NOT_IN_USE = 0; int STATE_IN_USE = 1; int STATE_REMOVED = -1; int STATE_RESERVED = -2; boolean compareAndSet(int expectState, int newState); void setState(int newState); int getState(); 有了这些数据结构的支持,我们的ConcurrentBag就可以实现它光的宣称了。 连接的获取是borrow方法,还可以传入一个timeout作为超时控制。 首先,如果某个线程执行非常快,使用了比较多的连接,就可以使用ThreadLocal的方式快速获取连接对象,而不用跑到大池子里面去获取。代码如下。 // Try the thread-local list first final var list = threadList.get(); for (int i = list.size() - 1; i >= 0; i--) { final var entry = list.remove(i); final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry; if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } 我们都知道,包括ArrayList和HashMap一些基础的结构,都是Fail Fast的,如果你在遍历的时候,删掉一些数据,有可能会引起问题。幸运的是,由于我们的List是从ThreadLocal获取的,它首先就避免了线程安全的问题。 接下来就是遍历。这段代码采用的是尾遍历(头遍历会出现错误),用于快速的从列表中找到一个可以复用的对象,然后使用CAS来把状态置为使用中。但如果对象正在被使用,则直接删除它。 在ConcurrentBag里,每个ThreadLocal最多缓存50个连接对象引用。 当ThreadLocal里找不到可复用的对象,它就会到大池子里去拿。也就是下面这段代码。 // Otherwise, scan the shared list ... then poll the handoff queue final int waiting = waiters.incrementAndGet(); try { for (T bagEntry : sharedList) { if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // If we may have stolen another waiters connection, request another bag add. if (waiting > 1) { listener.addBagItem(waiting - 1); } return bagEntry; } } listener.addBagItem(waiting); // 还拿不到,就需要等待别人释放了 timeout = timeUnit.toNanos(timeout); do { final var start = currentTime(); final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000); return null; } finally { waiters.decrementAndGet(); 首先要注意,这段代码可能是由不同的线程执行的,所以必须要考虑线程安全问题。由于shardList是线程安全的CopyOnWriteArrayList,适合读多写少的场景,我们可以直接进行遍历。 这段代码的目的是一样的,需要从sharedList找到一个空闲的连接对象。这里把自增的waiting变量传递到外面的代码进行处理,主要是由于想要根据waiting的大小来确定是否创建新的对象。 如果无法从池子里获取连接,则需要等待别的线程释放一些资源。 创建对象的过程是异步的,要想获取它,还需要依赖一段循环代码。while循环代码是纳秒精度,会尝试从handoffQueue里获取。最终会调用SynchronousQueue的transfer方法。 有借就有还,当某个连接使用完毕,它将被归还到池子中。 public void requite(final T bagEntry) { bagEntry.setState(STATE_NOT_IN_USE); for (var i = 0; waiters.get() > 0; i++) { if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) { return; } else if ((i & 0xff) == 0xff) { parkNanos(MICROSECONDS.toNanos(10)); } else { Thread.yield(); } } final var threadLocalList = threadList.get(); if (threadLocalList.size() < 50) { threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); } 首先,把这个对象置为可用状态。然后,代码会进入一个循环,等待使用方把这个连接接手过去。当连接处于STATE_NOT_IN_USE状态,或者队列中的数据被取走了,那么就可以直接返回了。 由于waiters.get()是实时获取的,有可能长时间一直大于0,这样代码就会变成死循环,浪费CPU。代码会尝试不同层次的睡眠,一个是每隔255个waiter睡10ns,一个是使用yield让出cpu时间片。 如果归还连接的时候并没有被其他线程获取到,那么最后我们会把归还的连接放入到相对应的ThreadLocal里,因为对一个连接来说,借和还,通常是一个线程。 看起来平平无奇的几行代码,为什么搞懂了就能Hold住大部分的并发编程场景呢?主要还是这里面的知识点太多。下面我简单罗列一下,你可以逐个攻破。 麻雀虽小,五脏俱全。如果你想要你的多线程编程能力更上一层楼,读一读这个短小精悍的ConcurrentBag吧。当你掌握了它,多线程的那些东西,不过是小菜一碟。 作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。