java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java的CAS实现无锁队列

Java高并发编程之CAS实现无锁队列代码实例

作者:Brycen Liu

这篇文章主要介绍了Java高并发编程之CAS实现无锁队列代码实例,在多线程操作中,我们通常会添加锁来保证线程的安全,那么这样势必会影响程序的性能,那么为了解决这一问题,于是就有了在无锁操作的情况下依然能够保证线程的安全,需要的朋友可以参考下

一、什么是无锁(Lock-Free)队列

在多线程操作中,我们通常会添加锁来保证线程的安全,那么这样势必会影响程序的性能。

那么为了解决这一问题,于是就有了在无锁操作的情况下依然能够保证线程的安全,实现无锁的原理就是利用了Campare and swap(CAS)算法

而我们的无锁队列无疑也是使用了这一方法。

二、线程不安全的队列

package com.brycen.concurrency03.collections.myqueue;
public class UnThreadSafeQueue<E> {
	//定义Node节点
	private static class Node<E> {
		private E element;//节点内存储的元素
		private Node<E> next;//下一个节点
		public Node(E element, Node<E> next) {
			super();
			this.element = element;
			this.next = next;
		}
		public E getElement() {
			return element;
		}
		public void setElement(E element) {
			this.element = element;
		}
		public Node<E> getNext() {
			return next;
		}
		public void setNext(Node<E> next) {
			this.next = next;
		}
		@Override
		public String toString() {
			return (element == null) ? "" : element.toString();
		}
	}
	//定义队列的头和尾
	private Node<E> head, last;
	//初始化队列长度为0
	private int size = 0;
	public int size() {
		return size;
	}
	public boolean isEmpty() {
		return size == 0;
	}
	//返回第一个元素
	public E peekFirst() {
		return head.element == null ? null : head.getElement();
	}
	//返回最后一个元素
	public E peekLast() {
		return last.element == null ? null : last.getElement();
	}
	//在尾部添加元素
	public void addLast(E element) {
		Node<E> newNode = new Node<E>(element, null);
		//如果为0,则代表队列没有元素
		if (size == 0) {
			head = newNode;
		}else {
			//队列有元素,则将最后一个元素的下一个值设置为新的元素
			last.setNext(newNode);
		}
		//新元素赋值给last
		last = newNode;
		//队列长度+1
		size++;
	}
	//移除并返回第一个元素
	public E removeFirst() {
		//如果为null,直接返回null
		if (isEmpty())
			return null;
		//拿到第一个Node中的元素
		E result = head.getElement();
		//获取第一个Node中的下一个元素并赋值给head
		head = head.getNext();
		//队列长度-1
		size--;
		//判断队列是否为null,如果为null,需要将last置为null
		if (size==0)
			last = null;
		return result;
	}
	public static void main(String[] args) {
		UnThreadSafeQueue<String> queue = new UnThreadSafeQueue<String>();
		queue.addLast("Hello");
		queue.addLast("World");
		queue.addLast("Java");
		System.out.println(queue.removeFirst());
		System.out.println(queue.removeFirst());
		System.out.println(queue.removeFirst());
	}
}

运行结果:

HelloWorldJava

三、线程安全的无锁队列

package com.brycen.concurrency03.collections.myqueue;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
public class LockFreeQueue<E> {
	//定义头和尾的原子性节点
	private AtomicReference<Node<E>> head, last;
	//定义原子性size
	private AtomicInteger size = new AtomicInteger(0);
	//初始化队列,将头和尾都指向一个null的节点
	public LockFreeQueue() {
		Node<E> node = new Node<E>(null);
		head = new AtomicReference<Node<E>>(node);
		last = new AtomicReference<Node<E>>(node);
	}
	//定义节点类
	private static class Node<E> {
		E element;
		//需要volatile,因为防止在next赋值的时候发生重排序,并且需要对其他线程可见
		volatile Node<E> next;
		public Node(E element) {
			this.element = element;
		}
		@Override
		public String toString() {
			return element == null ? null : element.toString();
		}
	}
	//添加元素到队尾
	public void addLast(E element) {
		//元素不允许为null
		if (element == null)
			throw new NullPointerException("The null element not allow");
		//新建一个新节点
		Node<E> newNode = new Node<E>(element);
		//getAndSet操作为原子性操作,先获取last的节点再将新的节点赋值给last
		Node<E> oldNode = last.getAndSet(newNode);
		//将旧节点的next指向新的节点
		oldNode.next = newNode;
		//队列长度+1
		size.incrementAndGet();
	}
	//移除并返回队首元素
	public E removeFirst() {
		//因为队首节点是存在的,但是他可能没有下一个节点,所以需要一个valueNode来判断
		Node<E> headNode, valueNode;
		do {
			//获取到队首节点
			headNode = head.get();
			//判断下一个节点是否为null
			valueNode = headNode.next;
		//当valueNode不为null,并且headNode不等于队列的head节点时,代表该元素被别的线程拿走的,需要重新获取。
		//当headNode等于队列的head时则代表头元素没有被其他元素拿走,并将head节点替换为valueNode。
		} while (valueNode != null && !head.compareAndSet(headNode, valueNode));
		E result = valueNode != null ? valueNode.element : null;
		//valueNode的元素被拿走了,所有将其置为null
		if (valueNode != null) {
			valueNode.element = null;
		}
		//队列长度-1
		size.decrementAndGet();
		return result;
	}
	public static void main(String[] args) throws InterruptedException {
		//创建线程池
		ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
		//实例化队列
		LockFreeQueue<String> queue = new LockFreeQueue<String>();
		//该map用于检查该队列是否是线程安全的,利用其key不能重复来判断
		ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<String, Object>();
		//随机数
		Random random = new Random(System.currentTimeMillis());
		//创建5个写runnable
		IntStream.range(0, 5).boxed().map(i -> (Runnable) () -> {
			int count = 0;
			//每个runnable往队列插入10个元素
			while (count++<10) {
				//这里值用系统的纳秒+随机数+count,以防止重复影响map集合对队列线程安全的判断
				queue.addLast(System.nanoTime()+":"+random.nextInt(10000)+":"+count);
			}
			//提交任务
		}).forEach(threadPool::submit);
		//创建5个读runnable
		IntStream.range(0, 5).boxed().map(i -> (Runnable) () -> {
			int count = 10;
			//每个runnable读10个元素
			while (count-->0) {
				//休眠
				try {
					TimeUnit.MILLISECONDS.sleep(20);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				//移除队列中的队首元素
				String result = queue.removeFirst();
				//输出
				System.out.println(result);
				//将该元素加入map中,来判断队列中真实存入的元素个数
				map.put(result, new Object());
			}
			//提交任务
		}).forEach(threadPool::submit);
		//关闭线程池
		threadPool.shutdown();
		//等待1小时候强制关闭线程池
		threadPool.awaitTermination(1, TimeUnit.HOURS);
		//打印map中的元素个数
		System.out.println(map.size());
	}
}

运行结果:

21135673377124:2114:1
21135673377124:1841:1
21135673535368:7640:2
21135673535316:7247:2
21135673430720:1589:1
21135673535143:670:2
21135673549201:8948:3
21135673549364:4671:3
21135673560864:9436:4
21135673551532:5637:3
21135673560412:6560:4
21135673570638:5363:5
21135673577820:9344:5
21135673570345:1147:5
21135673562713:1104:4
21135673580083:7526:6
21135673592905:8578:7
21135673589852:4333:7
21135673587482:4044:6
21135673585072:4774:6
21135673596794:8990:7
21135673600935:1491:8
21135673605719:7387:8
21135673602798:5391:8
21135673610435:7771:9
21135673610435:6732:9
21135673614788:9523:9
21135673623594:3529:10
21135673620198:3206:10
21135673620049:7079:10
21135673698937:3917:2
21135673722794:9326:5
21135673715108:8062:4
21135673683921:1847:1
21135673707190:7836:3
21135673730671:4207:6
21135673737982:9430:7
21135673756931:3648:9
21135673745386:6520:8
21135673764785:3733:10
21135673859035:6858:2
21135673840248:8995:1
21135673880691:7612:4
21135673871709:1741:3
21135673889204:9351:5
21135673897341:5110:6
21135673913246:9156:8
21135673918400:2077:9
21135673926590:1221:10
21135673905604:4850:7
50

到此这篇关于Java高并发编程之CAS实现无锁队列代码实例的文章就介绍到这了,更多相关Java的CAS实现无锁队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文