java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Semaphore详细解析

Java并发编程之Semaphore详解

作者:西瓜游侠

这篇文章主要介绍了Java并发编程之Semaphore详解,Semaphore信号量可以用来控制同时访问特定资源的线程数量,常用于限流场景,Semaphore接收一个int整型值,表示 许可证数量,需要的朋友可以参考下

1 概念

Semaphore(信号量,发音:三马佛儿),可以用来控制同时访问特定资源的线程数量,常用于限流场景。

Semaphore接收一个int整型值,表示 许可证数量。

线程通过调用acquire()获取许可证,执行完成之后通过调用release()归还许可证。只有获取到许可证的线程才能运行,获取不到许可证的线程将会阻塞。

Semaphore支持公平锁和非公平锁。

2 方法

Semaphore提供了一些方法,如下:

方法说明
acquire()获取一个许可证,在获取到许可证、或者被其他线程调用中断之前线程一直处于阻塞状态。
acquire(int permits)一次性获取多个许可证,在获取到多个许可证、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
acquireUninterruptibly()获取一个许可证,在获取到许可证之前线程一直处于阻塞状态(忽略中断)。
tryAcquire()尝试获取许可证,返回获取许可证成功或失败,不阻塞线程。
tryAcquire(long timeout, TimeUnit unit)尝试获取许可证,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
release()释放一个许可证,唤醒等待获取许可证的阻塞线程。
release(int permits)一次性释放多个许可证。
drainPermits()清空许可证,把可用许可证数置为0,返回清空许可证的数量。

3 例子

public class SemaphoreTest {

    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(5);
        System.out.println("初始总许可数 5");
        WorkerThread workerThread1 = new WorkerThread("worker-thread-1", semaphore);
        WorkerThread workerThread2 = new WorkerThread("worker-thread-2", semaphore);
        workerThread1.start();
        Thread.sleep(20);
        workerThread2.start();
    }
}

/**
 * 工作线程
 */
class WorkerThread extends Thread {

    private String name;
    private Semaphore semaphore;

    public WorkerThread(String name, Semaphore semaphore) {
        this. name = name;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            System.out.println(this.name + " 尝试获取许可.");
            // 获取许可证
            semaphore.acquire();
            System.out.println(this.name + " 获取许可成功,当前许可还剩 " + semaphore.availablePermits());
            Thread.sleep(3000);
            System.out.println(this.name + " 尝试释放许可.");
            // 释放许可证
            semaphore.release();
            System.out.println(this.name + " 释放许可成功,当前许可还剩 " + semaphore.availablePermits());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

初始总许可数 5
worker-thread-1 尝试获取许可.
worker-thread-2 尝试获取许可.
worker-thread-1 获取许可成功,当前许可还剩 4
worker-thread-2 获取许可成功,当前许可还剩 3
worker-thread-1 尝试释放许可.
worker-thread-1 释放许可成功,当前许可还剩 4
worker-thread-2 尝试释放许可.
worker-thread-2 释放许可成功,当前许可还剩 5
Process finished with exit code 0

4 源码解析

4.1 构造函数

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Semaphore有两个构造函数:

4.2 Sync、FairSync、NonfairSync

公平锁FairSync和非公平锁NonfairSync都继承了抽象类Sync。

Sync源码:

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        // 设置同步状态的值为初始化的许可证数量
        Sync(int permits) {
            setState(permits);
        }

        // 获取同步状态的值,也就是剩余可使用的许可证的数量
        final int getPermits() {
            return getState();
        }

        // 非公平锁尝试获取许可证
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        // 尝试释放许可证
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        // 尝试减少许可证的数量
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        // 将许可证数量清0
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

Sync继承AQS抽象类,构造函数调用的是AQS的setState(int newState)方法,将同步状态变量state的值设置为指定的初始化许可证的数量。

公平锁源码:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        // 公平锁获取许可证
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

非公平锁源码:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        // 非公平锁获取许可证
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

从源码中看出,公平锁和非公平锁在获取许可证的时候,逻辑是不一样的。

4.3 acquire获取许可证

调用Semaphore的acquire()函数可以获取许可证,源码如下:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

实际上,调用的是Sync对象的acquireSharedInterruptibly(int arg)方法,而Sync继承了AQS,并且没有重写这个方法,因此调用的是AQS的acquireSharedInterruptibly(int arg)方法,源码如下:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 如果中断,抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取同步变量
        if (tryAcquireShared(arg) < 0)
            // 如果获取失败,则将当前线程加入同步队列中去排队
            doAcquireSharedInterruptibly(arg);
    }

可以看出,调用tryAcquireShared(arg)来尝试获取同步变量,在这里也就是获取许可证。这个方法在AQS和Sync中都没有实现,但是被FairSync和NonfairSync分别实现了。 如果获取同步变量失败,则将当前线程放入同步队列中排队。

4.3.1 公平锁获取许可证

如果是FairSync公平锁,则实现如下:

    protected int tryAcquireShared(int acquires) {
        // 自旋
        for (;;) {
            // 判断有没有等待获取同步状态的线程,有则直接返回-1
            if (hasQueuedPredecessors())
                return -1;
            // 没有线程在等待获取同步状态,那么当前线程去获取同步状态
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                // 通过CAS更新同步状态的值
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

公平锁获取许可证的原理大致如下:

  1. 首先查看有没有线程在同步队列中排队等待获取许可证,如果有排队的,那么直接返回-1,这样将会执行doAcquireSharedInterruptibly(arg),将当前线程加入同步队列中去排队;
  2. 如果没有线程在排队,那么当前线程获取同步状态的值available,减掉想要获取的资源值acquires,也就是想要获取的许可证的数量,得到剩余的资源量remaining。如果remaining < 0,说明资源不够,本次获取失败,返回remaining值(这个时候返回的是< 0的值),外层代码会调用doAcquireSharedInterruptibly(arg)将当前线程排队;如果remaining > 0,说明资源是够用的,那么直接通过CAS原理更新同步状态的值。

4.3.2 非公平锁获取许可证

如果是NonfairSync非公平锁,则实现如下:

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }

调用的是父类Sync的nonfairTryAcquireShared(int acquires)方法:

    final int nonfairTryAcquireShared(int acquires) {
        // 自旋
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

非公平锁相对来说去掉了查看同步队列的逻辑。也就是说,在非公平锁的实现中,当前线程获取许可证的时候,不用去查看同步队列是否有线程在等待获取同步状态,而是直接去尝试获取许可证(改变同步状态的值)。

当然,如果remaining < 0,说明当前线程没能获取到期望数量的许可证,获取失败,返回< 0的值,在外部逻辑中,将会调用doAcquireSharedInterruptibly(arg)使当前线程进入同步队列中进行等待;如果remaining > 0,则通过CAS原理更新同步状态的值。

4.4 release释放许可证

通过调用Semaphore的release()方法可以释放许可证。

    public void release() {
        sync.releaseShared(1);
    }

实际上,调用的是Sync的releaseShared(int arg),而Sync并没有重写这个方法,因此调用的是AQS的releaseShared(int arg)方法:

    public final boolean releaseShared(int arg) {
        // 尝试释放同步变量
        if (tryReleaseShared(arg)) {
            // 如果成功,则唤醒后继节点
            doReleaseShared();
            return true;
        }
        return false;
    }

通过tryReleaseShared(arg)尝试释放同步变量,如果成功,则通过doReleaseShared()唤醒后继节点。

AQS并没有实现tryReleaseShared(arg)方法,而是被Semaphore的Sync实现了:

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

这里通过CAS改变同步状态的值,释放了许可证。

下面来看看doReleaseShared()是如何唤醒后继节点的:

    private void doReleaseShared() {
        for (;;) {
            // 首先获取头节点
            Node h = head;
            // 如果头节点存在
            if (h != null && h != tail) {
                // 获取头节点的状态
                int ws = h.waitStatus;
                // 如果头节点的状态是Node.SIGNAL,说明头节点的后继节点正等待被唤醒
                if (ws == Node.SIGNAL) {
                    // 将头节点的状态设置为初始状态
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒后继节点
                    unparkSuccessor(h);
                }
                // 如果头节点的状态已经是0了,则设置头节点状态为Node.PROPAGATE
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

尝试唤醒后继节点的逻辑比较简单:

  1. 首先获取同步队列中的头节点;
  2. 如果头节点存在,并且不是尾节点,接着获取头节点的状态;
  3. 如果头节点的状态是Node.SIGNAL,说明他的后继节点正等待着被他唤醒。这个时候通过CAS原理将头节点的状态置为0,如果成功了,则通过调用unparkSuccessor(h)唤醒后继节点,最后实际上调用的是LockSupport.unpark(Thread thread)方法唤醒线程的。

到此这篇关于Java并发编程之Semaphore详解的文章就介绍到这了,更多相关Semaphore详细解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文