Java高并发编程之CAS实现无锁队列代码实例
一、什么是无锁(Lock-Free)队列
在多线程操作中,我们通常会添加锁来保证线程的安全,那么这样势必会影响程序的性能。
那么为了解决这一问题,于是就有了在无锁操作的情况下依然能够保证线程的安全,实现无锁的原理就是利用了Campare and swap(CAS)算法
而我们的无锁队列无疑也是使用了这一方法。
二、线程不安全的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | package com.brycen.concurrency03.collections.myqueue; public class UnThreadSafeQueue<E> { //定义Node节点 private static class Node<E> { private E element; //节点内存储的元素 private Node<E> next; //下一个节点 public Node(E element, Node<E> next) { super (); this .element = element; this .next = next; } public E getElement() { return element; } public void setElement(E element) { this .element = element; } public Node<E> getNext() { return next; } public void setNext(Node<E> next) { this .next = next; } @Override public String toString() { return (element == null ) ? "" : element.toString(); } } //定义队列的头和尾 private Node<E> head, last; //初始化队列长度为0 private int size = 0 ; public int size() { return size; } public boolean isEmpty() { return size == 0 ; } //返回第一个元素 public E peekFirst() { return head.element == null ? null : head.getElement(); } //返回最后一个元素 public E peekLast() { return last.element == null ? null : last.getElement(); } //在尾部添加元素 public void addLast(E element) { Node<E> newNode = new Node<E>(element, null ); //如果为0,则代表队列没有元素 if (size == 0 ) { head = newNode; } else { //队列有元素,则将最后一个元素的下一个值设置为新的元素 last.setNext(newNode); } //新元素赋值给last last = newNode; //队列长度+1 size++; } //移除并返回第一个元素 public E removeFirst() { //如果为null,直接返回null if (isEmpty()) return null ; //拿到第一个Node中的元素 E result = head.getElement(); //获取第一个Node中的下一个元素并赋值给head head = head.getNext(); //队列长度-1 size--; //判断队列是否为null,如果为null,需要将last置为null if (size== 0 ) last = null ; return result; } public static void main(String[] args) { UnThreadSafeQueue<String> queue = new UnThreadSafeQueue<String>(); queue.addLast( "Hello" ); queue.addLast( "World" ); queue.addLast( "Java" ); System.out.println(queue.removeFirst()); System.out.println(queue.removeFirst()); System.out.println(queue.removeFirst()); } } |
运行结果:
HelloWorldJava
三、线程安全的无锁队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | package com.brycen.concurrency03.collections.myqueue; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; public class LockFreeQueue<E> { //定义头和尾的原子性节点 private AtomicReference<Node<E>> head, last; //定义原子性size private AtomicInteger size = new AtomicInteger( 0 ); //初始化队列,将头和尾都指向一个null的节点 public LockFreeQueue() { Node<E> node = new Node<E>( null ); head = new AtomicReference<Node<E>>(node); last = new AtomicReference<Node<E>>(node); } //定义节点类 private static class Node<E> { E element; //需要volatile,因为防止在next赋值的时候发生重排序,并且需要对其他线程可见 volatile Node<E> next; public Node(E element) { this .element = element; } @Override public String toString() { return element == null ? null : element.toString(); } } //添加元素到队尾 public void addLast(E element) { //元素不允许为null if (element == null ) throw new NullPointerException( "The null element not allow" ); //新建一个新节点 Node<E> newNode = new Node<E>(element); //getAndSet操作为原子性操作,先获取last的节点再将新的节点赋值给last Node<E> oldNode = last.getAndSet(newNode); //将旧节点的next指向新的节点 oldNode.next = newNode; //队列长度+1 size.incrementAndGet(); } //移除并返回队首元素 public E removeFirst() { //因为队首节点是存在的,但是他可能没有下一个节点,所以需要一个valueNode来判断 Node<E> headNode, valueNode; do { //获取到队首节点 headNode = head.get(); //判断下一个节点是否为null valueNode = headNode.next; //当valueNode不为null,并且headNode不等于队列的head节点时,代表该元素被别的线程拿走的,需要重新获取。 //当headNode等于队列的head时则代表头元素没有被其他元素拿走,并将head节点替换为valueNode。 } while (valueNode != null && !head.compareAndSet(headNode, valueNode)); E result = valueNode != null ? valueNode.element : null ; //valueNode的元素被拿走了,所有将其置为null if (valueNode != null ) { valueNode.element = null ; } //队列长度-1 size.decrementAndGet(); return result; } public static void main(String[] args) throws InterruptedException { //创建线程池 ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool( 10 ); //实例化队列 LockFreeQueue<String> queue = new LockFreeQueue<String>(); //该map用于检查该队列是否是线程安全的,利用其key不能重复来判断 ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<String, Object>(); //随机数 Random random = new Random(System.currentTimeMillis()); //创建5个写runnable IntStream.range( 0 , 5 ).boxed().map(i -> (Runnable) () -> { int count = 0 ; //每个runnable往队列插入10个元素 while (count++< 10 ) { //这里值用系统的纳秒+随机数+count,以防止重复影响map集合对队列线程安全的判断 queue.addLast(System.nanoTime()+ ":" +random.nextInt( 10000 )+ ":" +count); } //提交任务 }).forEach(threadPool::submit); //创建5个读runnable IntStream.range( 0 , 5 ).boxed().map(i -> (Runnable) () -> { int count = 10 ; //每个runnable读10个元素 while (count--> 0 ) { //休眠 try { TimeUnit.MILLISECONDS.sleep( 20 ); } catch (InterruptedException e) { e.printStackTrace(); } //移除队列中的队首元素 String result = queue.removeFirst(); //输出 System.out.println(result); //将该元素加入map中,来判断队列中真实存入的元素个数 map.put(result, new Object()); } //提交任务 }).forEach(threadPool::submit); //关闭线程池 threadPool.shutdown(); //等待1小时候强制关闭线程池 threadPool.awaitTermination( 1 , TimeUnit.HOURS); //打印map中的元素个数 System.out.println(map.size()); } } |
运行结果:
21135673377124:2114:1
21135673377124:1841:1
21135673535368:7640:2
21135673535316:7247:2
21135673430720:1589:1
21135673535143:670:2
21135673549201:8948:3
21135673549364:4671:3
21135673560864:9436:4
21135673551532:5637:3
21135673560412:6560:4
21135673570638:5363:5
21135673577820:9344:5
21135673570345:1147:5
21135673562713:1104:4
21135673580083:7526:6
21135673592905:8578:7
21135673589852:4333:7
21135673587482:4044:6
21135673585072:4774:6
21135673596794:8990:7
21135673600935:1491:8
21135673605719:7387:8
21135673602798:5391:8
21135673610435:7771:9
21135673610435:6732:9
21135673614788:9523:9
21135673623594:3529:10
21135673620198:3206:10
21135673620049:7079:10
21135673698937:3917:2
21135673722794:9326:5
21135673715108:8062:4
21135673683921:1847:1
21135673707190:7836:3
21135673730671:4207:6
21135673737982:9430:7
21135673756931:3648:9
21135673745386:6520:8
21135673764785:3733:10
21135673859035:6858:2
21135673840248:8995:1
21135673880691:7612:4
21135673871709:1741:3
21135673889204:9351:5
21135673897341:5110:6
21135673913246:9156:8
21135673918400:2077:9
21135673926590:1221:10
21135673905604:4850:7
50
到此这篇关于Java高并发编程之CAS实现无锁队列代码实例的文章就介绍到这了,更多相关Java的CAS实现无锁队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
微信公众号搜索 “ 脚本之家 ” ,选择关注
程序猿的那些事、送书等活动等着你
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!
相关文章
java实现多线程的两种方式继承Thread类和实现Runnable接口的方法
下面小编就为大家带来一篇java实现多线程的两种方式继承Thread类和实现Runnable接口的方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧2016-09-09springboot在filter中如何用threadlocal存放用户身份信息
这篇文章主要介绍了springboot中在filter中如何用threadlocal存放用户身份信息,本文章主要描述通过springboot的filter类,在过滤器中设置jwt信息进行身份信息保存的方法,需要的朋友可以参考下2024-07-07通过Java Reflection实现编译时注解正确处理方法
Java注解是一种标记在JDK5及以后的版本中引入,用于Java语言中向程序添加元数据的方法,这篇文章主要介绍了通过Java Reflection实现编译时注解处理方法,需要的朋友可以参考下2023-06-06
最新评论