java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > 信号量semaphore

Java的信号量semaphore讲解

作者:正经人z.

这篇文章主要介绍了Java的信号量semaphore讲解,Semaphore底层是基于AbstractQueuedSynchronizer来实现的,Semaphore称为计数信号量,它允许n个任务同时访问某个资源,需要的朋友可以参考下

一、简介

1、Semaphore底层是基于AbstractQueuedSynchronizer来实现的。Semaphore称为计数信号量,它允许n个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源。

2、它也是通过内部类Sync继承了AQS,它的方法几乎都是用的AQS的,少部分重写了AQS的方法,同时,它的信号量机制是通过AQS的state属性来操作的,变更state属性使用到了CAS操作保证变量的数据一致性。

3、它的方法几乎是通过Sync类,也就是AQS实现的。

4、该类如果单独使用,不涉及到条件队列condition,使用到条件队列需要调用await方法、notify方法,notifyAll方法等一系列线程通信的方法,但是,semaphore类中不涉及到这些方法。

5、注意其中一个方法:release,这个方法是释放信号量到semaphore中,默认是释放一个。源码中作者这么说的:不要求一个线程在调用release方法释放许可前必须通过acquire方法获取到许可。正确的使用semaphore建立在应用程序里面,对许可证的使用,由程序员掌握!semaphore只是提供了一个信号量的机制供其使用。

6、核心方法都在Sync类中,外部方法都是调用的AQS和Sync中重写的AQS的方法

7、注意其中一个方法:acquire,这个方法是获取信号量从semaphore中,默认是获取一个。源码中作者这么说的:如果没有可用的许可证,则当前线程变为出于线程调度目的而禁用,并处于休眠状态,直到发生两件事之一:

a、一些其他的线程调用了release方法释放了许可,同时semaphore中的许可满足当前线程,且当前线程是下一个被分配许可证的线程

b、一些其他的线程打断了当前线程且当前线程是通过acquire方式获取许可的

二、源码分析

三个内部类

Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    //构造方法设置许可数
        Sync(int permits) {
            setState(permits);
        }
    //获取许可
        final int getPermits() {
            return getState();
        }
    //共享模式下非公平策略获取许可数
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {   //无限循环
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))  //通过CAS修改许可数
                    return remaining;
            }
        }
    //共享模式下的公平策略释放许可
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))   //通过CAS修改许可数
                    return true;
            }
        }
    //减去许可数
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
// 获取并返回立即可用的所有许可
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

Sync类继承了AQS类,重写了nonfairTryAcquireShared、tryReleaseShared方法

NonfairSync和FairSync

//非公平策略	
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        NonfairSync(int permits) {
            super(permits);
        }
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);  //调用的父类Sync的方法
        }
    }
//公平策略
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        FairSync(int permits) {
            super(permits);
        }
        //公平策略下子类重写了方法
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

构造方法

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

常用方法

//默认获取一个许可
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
//默认释放一个许可
public void release() {sync.releaseShared(1); }
//获取指定的许可数    
public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
//尝试获取指定的许可数    
public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
//释放指定的许可数    
public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

没啥好说的,都是套娃子模式,真正的核心方法在Sync类中以及AQS类

示例

package com.hust.grid.leesf.semaphore;
import java.util.concurrent.Semaphore;
class MyThread extends Thread {
    private Semaphore semaphore;
    public MyThread(String name, Semaphore semaphore) {
        super(name);
        this.semaphore = semaphore;
    }
    public void run() {        
        int count = 3;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(count);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(count);
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }
    }
}
public class SemaphoreDemo {
    public final static int SEM_SIZE = 10;
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(SEM_SIZE);
        MyThread t1 = new MyThread("t1", semaphore);
        MyThread t2 = new MyThread("t2", semaphore);
        t1.start();
        t2.start();
        int permits = 5;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(permits);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }
    }
}

说明:首先,生成一个信号量,信号量有10个许可,然后,main,t1,t2三个线程获取许可运行

首先,main线程执行acquire操作,并且成功获得许可,之后t1线程执行acquire操作,成功获得许可,之后t2执行acquire操作,由于此时许可数量不够,t2线程将会阻塞,直到许可可用。之后t1线程释放许可,main线程释放许可,此时的许可数量可以满足t2线程的要求,所以,此时t2线程会成功获得许可运行,t2运行完成后释放许可。

三、总结

1、semaphore类的核心是Sync内部类,它继承了AQS类,适当重写了一些方法,其他的方法都调用的这个Sync中的方法,包括Sync类的两个子类:FairSync和NonFairSync。

2、semaphore类中实现了公平锁FairSync和非公平锁NonFairSync。默认使用的是非公平锁。

3、对于公平锁和非公平锁,主要体现在获取锁上面是否公平。非公平锁不会判断当前线程是否为同步队列中的第一个节点,而是直接操作state属性;公平锁会判断当前线程是否为同步队列中的第一个节点,具体方法是hasQueuedPredecessors,如果不是,则返回-1,如果是,则执行下面步骤。

4、semaphore的信号量机制使用的是AQS类的state属性,默认每次获取或释放信号量都是1,除非你指定要使用的信号量或释放的信号量数。

5、对state属性的添加和释放都必须保证是原子性的,所以,semaphore类中使用的是unsafe类的compareAndSetState方法配合for(; ;)无限循环,语义为CAS自旋,来保证对state属性的操作是原子性的。

到此这篇关于Java的信号量semaphore讲解的文章就介绍到这了,更多相关信号量semaphore内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文