博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java 线程 — ScheduledThreadPoolExecutor
阅读量:6914 次
发布时间:2019-06-27

本文共 5673 字,大约阅读时间需要 18 分钟。

ScheduledThreadPoolExecutor

该类继承自ThreadPoolExecutor,增加了定时执行线程和延迟启动的功能,这两个功能是通过延时队列DelayedWorkQueue辅助实现的。

线程池里面的线程需要从队列里面获取任务,任务根据延时时长是有顺序的,线程池的线一直获取延时最短的任务,也就是最小二叉堆中的堆顶元素,这个时候堆顶元素成为各个线程争夺的资源,

  1. 在获取堆顶元素的时候加锁(ReentrabtLock,可重入,独占锁),这样获取到锁的线程开始获取堆顶元素,其他线程在不能获取锁被阻塞
  2. 如果堆顶元素的延时还没有到,当前线程成为leader线程,进入超时等待
    1.1. 这个时候其他被阻塞的线程有机会获取锁
    1.2. 获取锁的线程发现leader线程已经另有其人(leader != null)
    1.3. 线程进入等待,available.await();
  3. 如果线程等待正常结束(时间已到),让出leander地位,再次进入循环,发现delayed <= 0,获取对顶元素,并重新堆化筛选出堆顶元素,调用available.signal()唤醒等待的线程(比如1.1的情况),释放锁
    3.1 假设是1.1里面的线程被唤醒(实际不一定,唤醒的也可能是其他线程)
    3.2 重复1、2、3、4的流程
  4. 线程获取到任务开始运行,运行ScheduledFutureTask.run方法,如果是定时任务的话,会重新计算延时时间,将任务加入队列,等待下次运行

DelayedWorkQueue

这个队列是一个阻塞的队列,队列基于二叉堆实现的,根据线程距离下次运行的时间比较大小,所以添加和删除元素都是二叉堆的重新堆化

offer

put、add都是调用下面的offer方法

public boolean offer(Runnable x) {    if (x == null)        throw new NullPointerException();    RunnableScheduledFuture e = (RunnableScheduledFuture)x;    final ReentrantLock lock = this.lock;    // 获取锁    lock.lock();    try {        int i = size;        if (i >= queue.length)            // 如果队列已满进行扩容            grow();        size = i + 1;        if (i == 0) {            // 第一个元素直接入队            queue[0] = e;            setIndex(e, 0);        } else {            // 加入新元素重新堆化            siftUp(i, e);        }        if (queue[0] == e) {            // 如果原来队列为空,说明可能有线程在等待,所以唤醒一个线程            leader = null;            available.signal();        }    } finally {        lock.unlock();    }    return true;}// 队列扩容,每次增加50%,直到Integer的最大值private void grow() {    int oldCapacity = queue.length;    int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%    if (newCapacity < 0) // overflow        newCapacity = Integer.MAX_VALUE;    queue = Arrays.copyOf(queue, newCapacity);}// 新加入元素之后重新堆化,最小堆private void siftUp(int k, RunnableScheduledFuture key) {    while (k > 0) {        // 二叉堆的特性父节点的序号 = (当前节点序号 - 1) / 2        int parent = (k - 1) >>> 1;        RunnableScheduledFuture e = queue[parent];        // 找到新加入元素合适的位置        if (key.compareTo(e) >= 0)            break;        queue[k] = e;        setIndex(e, k);        k = parent;    }    // 新元素入队列    queue[k] = key;    setIndex(key, k);}

take

take的时候使用的是leader-follow模式,只有一个leader,其他都是follow,在每次finishPoll的时候都会选举出新的

public RunnableScheduledFuture take() throws InterruptedException {    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {        for (;;) {            RunnableScheduledFuture first = queue[0];            if (first == null)                // 队列为空则进入等待                available.await();            else {                long delay = first.getDelay(TimeUnit.NANOSECONDS);                if (delay <= 0)                    // 延迟或者定时时间(其实定时间也是一种延迟)到,从队列中取出任务执行                    return finishPoll(first);                // 如果leader != null 说明leader是另外的线程(有可能是leader线程在available.awaitNanos(delay))是leader,那么当前线程进入等待                else if (leader != null)                    available.await();                else {                    // 没有leader线程的时候,当前线程成为新的leader                    Thread thisThread = Thread.currentThread();                    leader = thisThread;                    try {                        // 这里进行超时等待,超过delay之后就会恢复运行,或者是被其他线程唤醒                        available.awaitNanos(delay);                    } finally {                        if (leader == thisThread)                            // 重置leader以便进入下一次循环                            leader = null;                    }                }            }        }    } finally {        // 队列不为空的时候发出signal,leader == null的条件是防止leader线程在available.awaitNanos(delay)的时候被唤醒        if (leader == null && queue[0] != null)            available.signal();        lock.unlock();    }}// 返回第一个等待的线程(延时已到),并将剩余元素再次堆化private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {    int s = --size;    RunnableScheduledFuture x = queue[s];    queue[s] = null;    if (s != 0)        siftDown(0, x);    setIndex(f, -1);    return f;}// 因为key是原来堆中的元素位于堆得最底层,key本来就是较大的元素,private void siftDown(int k, RunnableScheduledFuture key) {    int half = size >>> 1;    while (k < half) {        int child = (k << 1) + 1;        RunnableScheduledFuture c = queue[child];        int right = child + 1;        if (right < size && c.compareTo(queue[right]) > 0)            c = queue[child = right];        if (key.compareTo(c) <= 0)            // 找到key的位置,大于父节点,小于子节点            break;        queue[k] = c;        setIndex(c, k);        k = child;    }    queue[k] = key;    setIndex(key, k);}

问题

period线程怎么实现定时调用

setNextRunTime会重新计算下次运行需要等待的时间,因为period线程运行完后已经从队列中删除,在reExecutePeriodic方法中会重新进入队列,调用ensurePrestart重新开始执行任务

public void run() {    boolean periodic = isPeriodic();    if (!canRunInCurrentRunState(periodic))        cancel(false);    else if (!periodic)        // 非定时线程调用FutureTask的run方法        ScheduledFutureTask.super.run();    else if (ScheduledFutureTask.super.runAndReset()) { // 定时线程调用FutureTask的runAndReset方法        // 设置下次运行时间        setNextRunTime();        // 重新准备运行        reExecutePeriodic(outerTask);    }}void reExecutePeriodic(RunnableScheduledFuture
task) { if (canRunInCurrentRunState(true)) { // task进入队列 super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); }}void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 进入线程池等待运行,接下来就和ThreadPoolExecutor运行顺序一样了 if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false);}

线程运行完成之后任务会不会从队列中删除,怎么删除的?

会删除,在finnishPoll中,重新堆化选出堆顶元素,原来的堆顶元素被覆盖,也就是删除了

转载于:https://www.cnblogs.com/sunshine-2015/p/6091438.html

你可能感兴趣的文章
Top 10 JavaScript编辑器,你在用哪个?
查看>>
数据访问层的优化思路
查看>>
饭后最该知道N件事
查看>>
一文教你看懂大数据的技术生态圈 Hadoop,hive,spark
查看>>
关于本地分区索引和索引组织表保证唯一性的限制
查看>>
MaxCompute Studio 2.8.1 新版本发布啦!
查看>>
《区块链原理、设计与应用》一2.5 认识上的误区
查看>>
当所有编程语言都在靠齐的时候
查看>>
苹果拥抱IBM背后:大数据推动手机行业洗牌
查看>>
无线广播可以毁灭物联网安全:信号干扰器及犯罪
查看>>
《并行计算的编程模型》一3.7.1 选择集合参与者
查看>>
百分点:利用大数据做智慧商业
查看>>
让你的软件永生的7个规则
查看>>
《中国人工智能学会通讯》——12.23 隐私保护
查看>>
物联网五大应用实例,一看便明了!!
查看>>
中国人工智能学会通讯——众包中的统计推断与激励机制 4 我们能否有更好的付钱方式...
查看>>
GitLab宣布支持Git大文件存储Git LFS
查看>>
联想王震宇:联想企业网盘3.5版本的功能亮点
查看>>
监控蓄电池 保障数据中心正常运营
查看>>
浅析自动化设备安装运维的发展方向
查看>>