java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Semaphore原理

Java中的Semaphore原理解析

作者:我不是欧拉_

这篇文章主要介绍了Java中的Semaphore原理解析,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,需要的朋友可以参考下

1. Semaphore是什么?

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore一般用于流量的控制,特别是公共资源有限的应用场景。例如数据库的连接,假设数据库的连接数上线为10个,多个线程并发操作数据库可以使用Semaphore来控制并发操作数据库的线程个数最多为10个。

2. 类图

通过类图可以看到,Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。 

3. 实现原理

3.1 使用示例

    // 定义一个资源池类
    class Pool {
        // 可用资源数100
        private static final int MAX_AVAILABLE = 100;
        // 定义信号量100
        private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
        // 获取资源
        public Object getItem() throws InterruptedException {
            // 尝试获取
            available.acquire();
            // 返回可用资源
            return getNextAvailableItem();
        }
        // 释放资源
        public void putItem(Object x) {
            // 如果资源标记为未被使用
            if (markAsUnused(x))
            // 释放资源
            available.release();
        }
        // Not a particularly efficient data structure; just for demo
        // 定义资源类型,可以是满足业务的任何类型
        protected Object[] items = new Object[MAX_AVAILABLE] ... whatever kinds of items being managed
        // 是否被使用标记
        protected boolean[] used = new boolean[MAX_AVAILABLE];
        // 获取下一个可用资源
        protected synchronized Object getNextAvailableItem() {
            // 循环遍历
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                // 如果未被使用
                if (!used[i]) {
                    // 使用标记设置为true
                    used[i] = true;
                    // 返回当前的资源
                    return items[i];
                }
            }
            return null; // not reached
        }
        // 标记资源为未被使用
        protected synchronized boolean markAsUnused(Object item) {
            // 循环遍历
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                // 找到需要释放的资源
                if (item == items[i]) {
                    // 如果是被使用中
                    if (used[i]) {
                    // 使用标记设置为false
                    used[i] = false;
                    // 返回true表示标记成功
                    return true;
                } else
                    // 返回false表示标记失败
                    return false;
                }
            }
            return false;
        }
    }

3.2 Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        // 构造方法,调用父类AQS的setState方法,给共享变量state赋值
        // 即通过构造方法给锁的数量附初始值
        Sync(int permits) {
            setState(permits);
        }
        // 获取锁,也叫许可
        final int getPermits() {
            return getState();
        }
        // 共享模式下的非公平获取
        // 此方法也体现出与ReentrantLock中Sync的实现不同
        // ReentrantLock中Sync是独占模式下的获取
        // 具体实现的不同体现在int remaining = available - acquires;
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                // 获取锁的可用数量
                int available = getState();
                // 可用数量 - 请求的数量(acquires默认值为1) = 剩余量
                int remaining = available - acquires;
                // 如果remaining < 0即请求的锁大于可用的数量,马上返回负数,表示获取锁失败
                if (remaining < 0 ||
                    // 否则通过CAS的方式将可用数量换成剩余量,并返回剩余量
                    // 自旋 + CAS 保证线程安全,线程不用排队体现出非公平性
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        // 共享模式下释放锁
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // CAS修改锁数量,成功则返回,失败则继续自旋
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        // 根据指定的缩减量减小可用锁的数目
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        // 获取并返回立即可用的所有锁
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

3.3 NonfairSync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        // 构造方法初始化锁数量
        NonfairSync(int permits) {
            super(permits);
        }
        // 直接调用nonfairTryAcquireShared方法,走非公平策略
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

3.4 FairSync

static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        // 构造方法初始化锁数量
        FairSync(int permits) {
            super(permits);
        }
        // 共享模式下的公平策略获取
        // 与非公平策略唯一的不同体现在线程是否需要排队
        // 即是否调用hasQueuedPredecessors()方法进行判断
        // 如果需要排队则立即返回继续排队
        // 否则通过CAS方式获取锁并返货锁的剩余量,结束自旋
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

通过分析代码发现,Semaphore与ReentrantLock的内部类的结构相同,具体实现的不同体现在 int remaining = available - acquires这行代码上。

ReentrantLock对于锁的控制是 int c = getState(); if (c == 0){....}。体现为一种独占的控制。

Semaphore对锁的控制是 for (;;) { int available = getState(); int remaining = available - acquires;......}。即所有线程都可以进入自旋,只要锁有剩余量都可以尝试获取锁,体现为一种共享的控制。

3.5 Semaphore

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    // 同步队列
    private final Sync sync;
    // 构造方法初始话锁数量
    // 默认采用非公平策略
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // 构造方法,带一个布尔参数,true表示采用公平策略,false表示采用非公平策略
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}

3.5.1 acquire() 方法解析

// Semaphore
public void acquire() throws InterruptedException {
        // 调用sync的acquireSharedInterruptibly,即响应中断的获取
        // 因为sync继承AbstractQueuedSynchronizer
        // 即调用AQS的acquireSharedInterruptibly
        sync.acquireSharedInterruptibly(1);
    }
// 进入AQS
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 如果线程被中断,则响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 否则调用tryAcquireShared,如果获取的锁小于0即获取锁失败则调用doAcquireSharedInterruptibly方法,进入同步队列排队
        // 如果获取锁成功则不排队,走业务逻辑
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
// Semaphore 中tryAcquireShared的实现
// 公平策略
protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
// 非公平策略
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
// 前面已经解析过,不在赘述
 

3.5.2 release() 方法解析

// Semaphore
public void release() {
        sync.releaseShared(1);
    }
// 进入AQS
public final boolean releaseShared(int arg) {
        // 尝试释放锁, 如果释放锁成功
        if (tryReleaseShared(arg)) {
            // 线程出同步队列,返回true
            doReleaseShared();
            return true;
        }
        // 否则返回false
        return false;
    }
// Semaphore 中tryReleaseShared实现
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

3.5.3 其他方法

方法说明调用
acquire(int permits)获取信号量,指定获取许可的个数,响应中断sync.acquireSharedInterruptibly(permits)
acquireUninterruptibly()获取信号量,默认获取1个许可,不响应中断sync.acquireShared(1)
acquireUninterruptibly(int permits)获取信号量,指定获取许可的个数,不响应中断sync.acquireShared(permits)
release(int permits)释放信号量,指定释放许可的个数sync.releaseShared(permits);
tryAcquire()尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断sync.nonfairTryAcquireShared(1)
tryAcquire(int permits)同上,可以指定获取许可的个数sync.nonfairTryAcquireShared(permits)
tryAcquire(long timeout, TimeUnit unit)共享式超时获取sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
tryAcquire(int permits, long timeout, TimeUnit unit)同上,可以指定获取许可的个数sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout))
availablePermits()获取可用许可数sync.getPermits()
drainPermits()将剩下的信号量一次性消耗光,并且返回所消耗的信号量sync.drainPermits()
reducePermits(int reduction)减少信号量的总数,不会导致任何线程阻塞,调用该方法可能会导致信号量最终为负数sync.reducePermits(reduction)
isFair()是否采用公平策略
hasQueuedThreads()是否是已排队的线程
getQueueLength()获取排队线程的长度
getQueuedThreads()获取排队线程

4. 总结

Semaphore是一个有效的流量控制工具,它基于AQS共享锁实现。我们常常用它来控制对有限资源的访问。使用步骤

每次使用资源前,先申请一个信号量,如果资源数不够,就会阻塞等待;每次释放资源后,就释放一个信号量。

到此这篇关于Java中的Semaphore原理解析的文章就介绍到这了,更多相关Semaphore原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文