Java的非阻塞队列ConcurrentLinkedQueue解读
作者:Java都不学
前言
在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。
使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环 CAS 的方式来实现。
ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即 CAS 算法) 来实现,该算法在 Michael&Scott 算法上进行了一些修改。
ConcurrentLinkedQueue 的结构
ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点(Node)由节点元素 (item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列。默认情况下 head 节点存储的元素为空,tail 节 点等于 head 节点。
入队列
1) 入队列的过程
每添加一个节点就做了一个队列的快照图
- 添加元素 1。队列更新 head 节点的 next 节点为元素 1 节点。又因为 tail 节点默认 情况下等于 head 节点,所以它们的 next 节点都指向元素 1 节点。
- 添加元素 2。队列首先设置元素 1 节点的 next 节点为元素 2 节点,然后更新 tail 节点指向元素 2 节点。
- 添加元素 3,设置 tail 节点的 next 节点为元素 3 节点。
- 添加元素 4,设置元素 3 的 next 节点为元素 4 节点,然后将 tail 节点指向元素 4 节点。
入队主要做两件事情:
第一是将入队节点设置成当前队列尾节点的下一个节点;
第二是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,如果 tail 节点的 next 节点为 空,则将入队节点设置成 tail 的 next 节点,所以 tail 节点不总是尾节点
/*在此队列的尾部插入指定元素。由于队列是无界的,这个方法永远不会返回false 。 返回值: true (由Queue.offer指定) 抛出: NullPointerException – 如果指定元素为 null*/ public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 入队前,创建一个入队节点 Node<E> n = new Node<E>(e); retry: // 死循环,入队不成功反复入队。 for (; ; ) { // 创建一个指向 tail 节点的引用 Node<E> t = tail; // p 用来表示队列的尾节点,默认情况下等于 tail 节点。 Node<E> p = t; for (int hops = 0; ; hops++) { // 获得 p 节点的下一个节点。 Node<E> next = succ(p); // next 节点不为空,说明 p 不是尾节点,需要更新 p 后在将它指向 next 节点 if (next != null) { // 循环了两次及其以上,并且当前节点还是不等于尾节点 if (hops > HOPS && t != tail) continue retry; p = next; } // 如果 p 是尾节点,则设置 p 节点的 next 节点为入队节点。 else if (p.casNext(null, n)) { /*如果 tail 节点有大于等于 1 个 next 节点,则将入队节点设置成 tail 节点, 更新失败了也没关系,因为失败了表示有其他线程成功更新了 tail 节点*/ if (hops >= HOPS) casTail(t, n); // 更新 tail 节点,允许失败 return true; } // p 有 next 节点,表示 p 的 next 节点是尾节点,则重新设置 p 节点 else { p = succ(p); } } } }
整个入队过程主要做两件事情:第一是定位出尾节点;第二是 使用 CAS 算法将入队节点设置成尾节点的 next 节点,如不成功则重试。
2) 定位尾节点
tail 节点并不总是尾节点,所以每次入队都必须先通过 tail 节点来找到尾节点。尾节点可能是 tail 节点,也可能是 tail 节点的 next 节点。代码中循环体中的第一个 if 就是判 断 tail 是否有 next 节点,有则表示 next 节点可能是尾节点。获取 tail 节点的 next 节点需要注意的是 p 节点等于 p 的 next 节点的情况,只有一种可能就是 p 节点和 p 的 next 节点 都等于空,表示这个队列刚初始化,正准备添加节点,所以需要返回 head 节点。
final Node<E> succ(Node<E> p) { Node<E> next = p.getNext(); return (p == next) head:next; }
3) 设置入队节点为尾节点
p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点的 next 节点,如果 p.next 是 null,表示 p 是当前队列的尾节点,如果不为 null,表示有其他线程更新了尾节点, 则需要重新获取当前队列的尾节点。
4) HOPS 的设计意图
/*让 tail 节点永远作为队列的尾节点,每次都需要使用循环 CAS 更新 tail 节点。如果能减少 CAS 更新 tail 节点的次数,就能提高入队的效率*/ public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); for (; ; ) { Node<E> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; } } }
使用 hops 变量来控制并减少 tail 节点的更新频率,并不是每次节点入队后都将 tail 节点更新成尾节点,而是当 tail 节 点和尾节点的距离大于等于常量 HOPS 的值(默认等于 1)时才更新 tail 节点,tail 和尾节点的距离越长,使用 CAS 更新 tail 节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节 点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对 volatile 变量的读 操作来减少对 volatile 变量的写操作,而对 volatile 变量的写操作开销要远远大于读操作,所以入队效率会有所提升。 private static final int HOPS = 1;
出队列
出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。
并不是每次出队时都更新 head 节点,当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点。只有当 head 节点里没有元素时,出队操作才会更新 head 节点。这种做法也是通过 hops 变量来减少使用 CAS 更新 head 节点的 消耗,从而提高出队效率。
public E poll() { Node<E> h = head; // p 表示头节点,需要出队的节点 Node<E> p = h; for (int hops = 0; ; hops++) { // 获取 p 节点的元素 E item = p.getItem(); // 如果 p 节点的元素不为空,使用 CAS 设置 p 节点引用的元素为 null, // 如果成功则返回 p 节点的元素。 if (item != null && p.casItem(item, null)) { if (hops >= HOPS) { // 将 p 节点下一个节点设置成 head 节点 Node<E> q = p.getNext(); updateHead(h, (q != null)q :p); } return item; } // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外 // 一个线程修改了。那么获取 p 节点的下一个节点 Node<E> next = succ(p); // 如果 p 的下一个节点也为空,说明这个队列已经空了 if (next == null) { // 更新头节点。 updateHead(h, p); break; } // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 p = next; } return null; }
首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用 CAS 的方式将头节点的引用设置成 null,如果 CAS 成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了 head 节点,导致元素发生了变化,需要 重新获取头节点。
到此这篇关于Java的非阻塞队列ConcurrentLinkedQueue解读的文章就介绍到这了,更多相关非阻塞队列ConcurrentLinkedQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!