`

ScheduledThreadPoolExecutor的scheduleAtFixedRate方法探究

 
阅读更多

    ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,还可以延迟执行任务或者周期性的执行某个任务。scheduleWithFixedDelay和scheduleAtFixedRate就是用来完成这个功能的。平常使用scheduleAtFixedRate这个方法时并没有多想,但是这几天在实现一个功能的时候,需要考虑scheduleAtFixedRate所执行的task是否会影响任务的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么这个command的执行会不会影响这个10秒的周期性。因此特意仔细看了下ScheduledThreadPoolExecutor的源代码,这里记录一下,以便以后查看。

    scheduleAtFixedRate有两个时间参数,initialDelay和period,对应该方法的两个主要功能,即延迟运行任务和周期性执行任务。

 

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Object>(command,
                                            null,
                                            triggerTime(initialDelay, unit),
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
    }

    /**
     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
     */
    private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }
        // Prestart a thread if necessary. We cannot prestart it
        // running the task because the task (probably) shouldn't be
        // run yet, so thread will just idle until delay elapses.
        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();

        super.getQueue().add(command);
    }

    首先创建一个ScheduledFutureTask,然后通过delayedExecute执行这个task。在delayedExecute中,首先预先启动一个线程,这里要注意的是这个这里用来启动一个新线程的firstTask参数是null,所以新启动的线程是idle状态的,然后把这个task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是DelayedWorkQueue,这个DelayedWorkQueue就是实现delay的关键。DelayedWorkQueue内部使用的是DelayQueue,DelayQueue实现task delay的关键就在于其Offer(E e)和Take.下面,通过分析这两个方法和结合ThreadPoolExecutor的运行原理来说明delay操作是如何实现的

 

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            q.offer(e);
            if (first == null || e.compareTo(first) < 0)
                available.signalAll();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);
                    } else {
                        E x = q.poll();
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // wake up other takers
                        return x;

                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }

      ScheduledThreadPoolExecutor执行task是通过工作线程Work来承担的,Work的Run方法如下:

 

        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

     因为前面在delayedExecute方法里面创建work线程的firstTask参数为null,所以就通过getTask去从workQueue里面获取task,getTask在正常情况下(即线程池没有关闭,线程数量没有超过corePoolSize等)是通过workQueue.take()从workQueue里获取任务。根据上面的贴出来的take方法的代码,如果queue是空的,则take方法会阻塞住,直到有新task被add进来。而在上面的delayedExecute方法的最后,会把创建的scheduledFutureTask加入到workQueue,这样take方法中的available.await()就被唤醒;在take方法里面,如果workQueue不为空,则执行task.getDelay()方法获取task的delay

        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

   这里的time是通过两个方法把initialDelay变成一个triggerTime

    /**
     * Returns the trigger time of a delayed action.
     */
    private long triggerTime(long delay, TimeUnit unit) {
         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
         return now() +
             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

注意看这个方法,这里返回的delay不是固定不变的,从task被放入workQueue起,不同的时间调用getDelay方法会得出不同的delay。如果放入workQueue的task的initialDelay是5秒,那么根据take方法的代码,如果在放入workQueue5秒后,就可以从delayQueue中拿到5秒前put进去的task,这样就实现了delay的功能。

 

   在本文的最前面提到scheduleAtFixedRate能够周期性地执行一项任务,那么这个是如何实现的呢?在scheduleAtFixedRate方法里创建了一个ScheduledFutureTask,这个ScheduledFutureTask包装了command,最后周期性执行的是ScheduledFutureTask的run方法。

        private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset();
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isStopped()))) {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
            }
            // This might have been the final executed delayed
            // task.  Wake up threads to check.
            else if (down)
                interruptIdleWorkers();
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            if (isPeriodic())
                runPeriodic();
            else
                ScheduledFutureTask.super.run();
        }

     由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20秒,即scheduleAtFixedRate这个方法要等一个完整的command方法执行完成后才继续周期性地执行command方法,其实这样的设计也是符合常理的。

 

     以上就是对ScheduledThreadPoolExecutor的一点小理解。

0
0
分享到:
评论
3 楼 olylakers 2013-03-05  
是的,的确是取两者max,之前没仔细看
futureTask的getDelay方法如下:
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }
而在runPeriodic里面执行了如下代码:
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);

而delayQueue的take方法是:
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);
                    } else {
                        E x = q.poll();
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // wake up other takers
                        return x;

                    }

所以如果period>方法执行时间,那么肯定按照period来循环
如果period<方法执行时间,那么上面的delay<0,同样也执行task

zjilvufe 写道
由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20

个人认为不是2者相+,而是取2者的max.
2 楼 rabbit0708 2013-03-05  
我也觉得应该是2者中的大的值
见代码:(运行代码见真相吧)
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPoolExecutor {
private static SimpleDateFormat format = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {
// ScheduledExecutorService exec=Executors.newScheduledThreadPool(1);
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(20);
/**
* 每隔一段时间打印系统时间,互不影响的<br/>
* 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;<br/>
* 也就是将在 initialDelay 后开始执行,然后在initialDelay+period 后执行,<br/>
* 接着在 initialDelay + 2 * period 后执行,依此类推。
*/
exec.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("scheduleWithFixedDelay:begin,"
+ format.format(new Date()));
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("scheduleWithFixedDelay:end,"
+ format.format(new Date()));
}
}, 1000, 1000, TimeUnit.MILLISECONDS);

// 开始执行后就触发异常,next周期将不会运行
// exec.scheduleAtFixedRate(new Runnable() {
// public void run() {
// System.out
// .println("RuntimeException no catch,next time can't run");
// throw new RuntimeException();
// }
// }, 1000, 5000, TimeUnit.MILLISECONDS);

// 虽然抛出了运行异常,当被拦截了,next周期继续运行
// exec.scheduleAtFixedRate(new Runnable() {
// public void run() {
// try {
// throw new RuntimeException();
// } catch (Exception e) {
// System.out.println("RuntimeException catched,can run next");
// }
// }
// }, 1000, 5000, TimeUnit.MILLISECONDS);

/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,<br/>
* 随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
*/
// exec.scheduleWithFixedDelay(new Runnable() {
// public void run() {
// System.out.println("scheduleWithFixedDelay:begin,"
// + format.format(new Date()));
// try {
// Thread.sleep(7000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("scheduleWithFixedDelay:end,"
// + format.format(new Date()));
// }
// }, 1000, 5000, TimeUnit.MILLISECONDS);

/**
* 创建并执行在给定延迟后启用的一次性操作。
*/
// exec.schedule(new Runnable() {
// public void run() {
// System.out.println("The thread can only run once!");
// }
// }, 5000, TimeUnit.MILLISECONDS);
}
}
1 楼 zjilvufe 2012-12-21  
由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20

个人认为不是2者相+,而是取2者的max.

相关推荐

Global site tag (gtag.js) - Google Analytics