- 浏览: 261006 次
- 性别:
- 来自: 杭州
博客专栏
-
Redis代码阅读
浏览量:22762
文章分类
最新评论
-
sp42:
可否处理下排版?
由一个UndeclaredThrowableException带来的思考 -
sp42:
可否处理下排版?
异常处理经验小结之二:利用Spring AOP将check exception 转化为unchecked exception -
sp42:
谢谢
异常处理经验小结之一:不要直接抛出InvocationTargetException -
unflynaomi:
我也是这个原因,大赞!!!!
虚拟机ubuntu为什么不能上网? -
1234abc:
谢谢分享,很好的实例,赞一个
一个“诡异”的NumberFormatException
ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,还可以延迟执行任务或者周期性的执行某个任务。scheduleWithFixedDelay和scheduleAtFixedRate就是用来完成这个功能的。平常使用scheduleAtFixedRate这个方法时并没有多想,但是这几天在实现一个功能的时候,需要考虑scheduleAtFixedRate所执行的task是否会影响任务的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么这个command的执行会不会影响这个10秒的周期性。因此特意仔细看了下ScheduledThreadPoolExecutor的源代码,这里记录一下,以便以后查看。
scheduleAtFixedRate有两个时间参数,initialDelay和period,对应该方法的两个主要功能,即延迟运行任务和周期性执行任务。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } /** * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. */ private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
首先创建一个ScheduledFutureTask,然后通过delayedExecute执行这个task。在delayedExecute中,首先预先启动一个线程,这里要注意的是这个这里用来启动一个新线程的firstTask参数是null,所以新启动的线程是idle状态的,然后把这个task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是DelayedWorkQueue,这个DelayedWorkQueue就是实现delay的关键。DelayedWorkQueue内部使用的是DelayQueue,DelayQueue实现task delay的关键就在于其Offer(E e)和Take.下面,通过分析这两个方法和结合ThreadPoolExecutor的运行原理来说明delay操作是如何实现的
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } }
ScheduledThreadPoolExecutor执行task是通过工作线程Work来承担的,Work的Run方法如下:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
因为前面在delayedExecute方法里面创建work线程的firstTask参数为null,所以就通过getTask去从workQueue里面获取task,getTask在正常情况下(即线程池没有关闭,线程数量没有超过corePoolSize等)是通过workQueue.take()从workQueue里获取任务。根据上面的贴出来的take方法的代码,如果queue是空的,则take方法会阻塞住,直到有新task被add进来。而在上面的delayedExecute方法的最后,会把创建的scheduledFutureTask加入到workQueue,这样take方法中的available.await()就被唤醒;在take方法里面,如果workQueue不为空,则执行task.getDelay()方法获取task的delay
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }
这里的time是通过两个方法把initialDelay变成一个triggerTime
/** * Returns the trigger time of a delayed action. */ private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
注意看这个方法,这里返回的delay不是固定不变的,从task被放入workQueue起,不同的时间调用getDelay方法会得出不同的delay。如果放入workQueue的task的initialDelay是5秒,那么根据take方法的代码,如果在放入workQueue5秒后,就可以从delayQueue中拿到5秒前put进去的task,这样就实现了delay的功能。
在本文的最前面提到scheduleAtFixedRate能够周期性地执行一项任务,那么这个是如何实现的呢?在scheduleAtFixedRate方法里创建了一个ScheduledFutureTask,这个ScheduledFutureTask包装了command,最后周期性执行的是ScheduledFutureTask的run方法。
private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = triggerTime(-p); ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); } /** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); }
由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20秒,即scheduleAtFixedRate这个方法要等一个完整的command方法执行完成后才继续周期性地执行command方法,其实这样的设计也是符合常理的。
以上就是对ScheduledThreadPoolExecutor的一点小理解。
评论
futureTask的getDelay方法如下:
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
而在runPeriodic里面执行了如下代码:
if (p > 0)
time += p;
else
time = triggerTime(-p);
而delayQueue的take方法是:
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
所以如果period>方法执行时间,那么肯定按照period来循环
如果period<方法执行时间,那么上面的delay<0,同样也执行task
个人认为不是2者相+,而是取2者的max.
见代码:(运行代码见真相吧)
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
private static SimpleDateFormat format = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// ScheduledExecutorService exec=Executors.newScheduledThreadPool(1);
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(20);
/**
* 每隔一段时间打印系统时间,互不影响的<br/>
* 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;<br/>
* 也就是将在 initialDelay 后开始执行,然后在initialDelay+period 后执行,<br/>
* 接着在 initialDelay + 2 * period 后执行,依此类推。
*/
exec.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("scheduleWithFixedDelay:begin,"
+ format.format(new Date()));
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("scheduleWithFixedDelay:end,"
+ format.format(new Date()));
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
// 开始执行后就触发异常,next周期将不会运行
// exec.scheduleAtFixedRate(new Runnable() {
// public void run() {
// System.out
// .println("RuntimeException no catch,next time can't run");
// throw new RuntimeException();
// }
// }, 1000, 5000, TimeUnit.MILLISECONDS);
// 虽然抛出了运行异常,当被拦截了,next周期继续运行
// exec.scheduleAtFixedRate(new Runnable() {
// public void run() {
// try {
// throw new RuntimeException();
// } catch (Exception e) {
// System.out.println("RuntimeException catched,can run next");
// }
// }
// }, 1000, 5000, TimeUnit.MILLISECONDS);
/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,<br/>
* 随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
*/
// exec.scheduleWithFixedDelay(new Runnable() {
// public void run() {
// System.out.println("scheduleWithFixedDelay:begin,"
// + format.format(new Date()));
// try {
// Thread.sleep(7000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("scheduleWithFixedDelay:end,"
// + format.format(new Date()));
// }
// }, 1000, 5000, TimeUnit.MILLISECONDS);
/**
* 创建并执行在给定延迟后启用的一次性操作。
*/
// exec.schedule(new Runnable() {
// public void run() {
// System.out.println("The thread can only run once!");
// }
// }, 5000, TimeUnit.MILLISECONDS);
}
}
个人认为不是2者相+,而是取2者的max.
发表评论
-
获取jvm中的所有实例
2012-12-05 11:53 0如何获得当前jvm内已经实例化的所有对象信息? ... -
工作总结
2012-11-23 20:03 01.识别图片的时候,有的cdn图片可能不是以任何图片结尾。如不 ... -
java线程池队列
2012-11-20 10:07 0建议使用有界队列,有界队列能增加系统的稳定性和预 ... -
ibatis支持enum类型
2012-09-11 17:15 0ibaitis支持enum,是通过EnumTypeHandle ... -
根据testCase来改进代码
2012-09-11 15:23 0Dao层到Service的参数转换,ContentClass的 ... -
由一个UndeclaredThrowableException带来的思考
2012-05-16 22:06 32046前段时间在调试项目 ... -
RPC
2012-04-17 21:58 0在rpc的时候,动态代理的使用可能会造成不一致,interfa ... -
把秒数转化为:小时:分:秒
2011-09-16 16:52 7967今天在工作中有这么一个小功能需要实现,在DB里面,经 ... -
如何处理网络异常的浅见
2011-09-16 08:58 1776虽然说网络连 ... -
HornetQ源代码阅读之消息发送和接收
2011-09-09 15:34 01.CrreatProducer() 2.Create me ... -
异常处理经验小结之二:利用Spring AOP将check exception 转化为unchecked exception
2011-08-14 13:00 7784对于Exception的处理,究竟是采用check exce ... -
如何保证服务的一致性
2011-08-09 10:01 0服务重试 -
Java拆箱装箱陷阱
2011-08-08 16:15 0Map<Integer, List<Happi ... -
高效安全发布对象
2011-08-08 10:17 0通过final高效发布集合对象。 在新的内存模型中,在 ... -
异常处理经验小结之一:不要直接抛出InvocationTargetException
2011-08-14 12:27 44278在最近一段时间的工作中,积累了几点异常处理的经验,怕时间久了 ... -
Java Concurrency In Practice之Java非阻塞算法解析
2011-05-17 13:28 2540基于锁得算法会带来一些活跃度失败的风险。如果线程 ... -
Non-Blocking stack和Block stack的插入时间对比
2011-05-17 13:25 2138本文通过对实现了两个Stack,一个是基于非阻塞(Non-B ... -
CyclicBarrier解析
2011-05-09 18:39 1652CyclicBarrier是一个同步辅助类,允许一组线程相互 ... -
如何尽可能地使10个线程同时开始工作
2011-05-06 22:34 2577在多线程编程中,我们很难控制线程的具体启动时间,调用线程的S ... -
Java Concurrency In Practice读书笔记之组合对象
2011-04-28 22:01 1597组合对象这一章主要 ...
相关推荐
主要为大家详细介绍了java中timer的schedule和scheduleAtFixedRate方法区别,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
Java定时任务schedule和scheduleAtFixedRate的异同
NULL 博文链接:https://michael1990.iteye.com/blog/2113141
调用scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
调度可以用 Timer 【调用 schedule() 或者 scheduleAtFixedRate() 方法实现】或者 ScheduledExecutorService 【结合工作中其它的需求,笔者选用此】 ScheduledExecutorService的初始化(线程池):
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { public void run() { Integer number = NumberMachine.getInstance().getCommonManager().generateNewManager(); System.out....
定时删除文件,导入库类有Timer类,TimeTask类,运用的函数有:scheduleAtFixedRate()。
Java中的Timer和TimerTask简介...另一种执行任务的模式是scheduleAtFixedRate。在这种模式下,Timer类会尽量使任务保持在一个固定的频率下重复执行。 TimerTask一个抽象类,它的子类代表一个可以被Timer计划的任务。
timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { Message message=new Message(); message.what=1; handler.sendMessage(message); } }, 1000,5000); 2.在线程中创建Handle
串行执行器服务 ScheduledExecutorService 的测试实用程序实现 允许测试如下代码: ... service.scheduleAtFixedRate(new Runnable() { @Override public void run() { count++; bar.doWhatever(); } },
timer.scheduleAtFixedRate(new TimerTask() { int j=0; int i=0; @Override public void run() { if(over==true){ return; } handler.sendEmptyMessage(kPostInvalidate); i=i+...
scheduledExecutorService.scheduleAtFixedRate(new ScrollTask(), 1,2, TimeUnit.SECONDS); super.onStart(); } @Override protected void onStop() { //当Activity不可见的时候停止切换 ...