在ThreadPoolExecutor的构造方法里有1个这样的参数,RejectedExecutionHandler handler通过查看Jdk我们可以知道这是1个接口,而且jdk内置实现了4种谢绝策略,它们都是ThreadPoolExecutor的public static class。

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ExecutorServiceDemo { static void log(String msg) { System.out.println(System.currentTimeMillis() + " -> " + msg); } public static void main(String[] args) throws Exception { ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 0; i < 10; i++) { final int index = i; pool.submit(new Runnable() { public void run() { log("run task:" + index + " -> " + Thread.currentThread().getName()); try { Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); } log("run over:" + index + " -> " + Thread.currentThread().getName()); } }); } log("before sleep"); Thread.sleep(4000L); log("before shutdown()"); pool.shutdown(); log("after shutdown(),pool.isTerminated=" + pool.isTerminated()); pool.awaitTermination(1, TimeUnit.SECONDS); log("now,pool.isTerminated=" + pool.isTerminated()); } }




若以上的策略还是没法满足实际利用的需要。我们还可以自己扩大RejectedExecutionHandler {
void rejectedExecution(Runnable r,ThreadPoolExecutor executor);
在ThreadPoolExecutor的Execute代码里可以看见当由于超越其界限而没有更多可用的线程或队列槽时,或关闭 Executor 时便可能产生这类情况
 final void reject(Runnable command) {
        handler.rejectedExecution(command, this);


public Thread newThread(Runnable r);
import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 自定义线程池 :public interface ThreadFactory * 根据需要创建新线程的对象 * Thread newThread(Runnable r) * @author Administrator * */ public class MyThreadFactory { public static class MyTask implements Runnable { @Override public void run() { System.out.println( System.currentTimeMillis() + " Thread id:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService exec = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread t = new Thread(r,"后台线程"); t.setDaemon(true); System.out.println("create " + t.getName()+ " " + t.getId()); return t; } }); for(int i = 0; i < 5; i++) { exec.submit(task); } TimeUnit.SECONDS.sleep(2); } }


protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
同时再看ThreadPoolExecutor中有1个这样的类, private final class Worker extends AbstractQueuedSynchronizer implements Runnable它里面有1个方法runWorker
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task);<span style="white-space:pre"> </span>//运行前 Throwable thrown = null; try { task.run();<span style="white-space:pre"> </span>//运行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);<span style="white-space:pre"> </span>//运行结束后 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ExtThreadPool { public static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在履行线程id:" + Thread.currentThread().getId() + " Task name " + name); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) //直接取得ThreadPoolExecutor的子类,并且重写protect钩子方法 { @Override protected synchronized void beforeExecute(Thread t, Runnable r) { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("准备履行:" + ((MyTask)r).name); } @Override protected synchronized void afterExecute(Runnable r, Throwable t) { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("履行完成:" + ((MyTask)r).name); } @Override protected synchronized void terminated() { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("线程池退出..."); } }; for(int i = 0; i < 5; i++) { MyTask task = new MyTask("Task" + i); exec.execute(task); Thread.sleep(10); } exec.shutdown(); } }



import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 演示线程池中的异常 * @author Administrator * */ public class DivTask implements Runnable { int a,b; public DivTask(int a, int b) { super(); this.a = a; this.b = b; } @Override public void run() { double res = a/b; System.out.println(res); } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) { es.execute(new DivTask(100, i)); } es.shutdown(); } }
Exception in thread "pool⑴-thread⑴" java.lang.ArithmeticException: / by zero at ch3.DivTask.run(DivTask.java:25) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 50.0 33.0 25.0 100.0

import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable command) { super.execute(wrap(command,clientStack(),Thread.currentThread().getName())); } @Override public Future<?> submit(Runnable task) { return super.submit(wrap(task,clientStack(),Thread.currentThread().getName())); } public Exception clientStack() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) { return new Runnable() { @Override public void run() { try { task.run(); } catch(Exception e) { clientStack.printStackTrace(); throw e; } } }; } }
100.0 25.0 33.0 50.0 java.lang.Exception: Client stack trace at DivTask$TraceThreadPoolExecutor.clientStack(DivTask.java:53) at DivTask$TraceThreadPoolExecutor.execute(DivTask.java:46) at DivTask.main(DivTask.java:75) Exception in thread "pool⑴-thread⑴" java.lang.ArithmeticException: / by zero at DivTask.run(DivTask.java:32) at DivTask$TraceThreadPoolExecutor$1.run(DivTask.java:61) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

不过,有时候使用submit取得异常信息更加方便。jdk的定义:Callable 接口类似于Runnable,二者都是为那些其实例可能被另外一个线程履行的类设计的。但是Runnable 不会返回结果,并且没法抛出经过检查的异常。




Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion. 


个人觉得cancel execution这个用途不大,很少有需要去取消履行的。



There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don't have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task's return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.

意思就是如果你在你的task里会抛出checked或unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那末就需要用到submit,通过捕获Future.get抛出的异常。

比如说,我有很多更新各种数据的task,我希望如果其中1个task失败,其它的task就不需要履行了。那我就需要catch Future.get抛出的异常,然后终止其它task的履行,代码以下:

import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); // 创建10个任务并履行 for (int i = 0; i < 10; i++) { // 使用ExecutorService履行Callable类型的任务,并将结果保存在future变量中 Future<String> future = executorService.submit(new TaskWithResult(i)); // 将任务履行结果存储到List中 resultList.add(future); } executorService.shutdown(); // 遍历任务的结果 for (Future<String> fs : resultList) { try { System.out.println(fs.get()); // 打印各个线程(任务)履行的结果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { executorService.shutdownNow(); e.printStackTrace(); return; } } } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } /** * 任务的具体进程,1旦任务传给ExecutorService的submit方法,则该方法自动在1个线程上履行。 * * @return * @throws Exception */ public String call() throws Exception { System.out.println("call()方法被自动调用,干活!!! " + Thread.currentThread().getName()); if (new Random().nextBoolean()) throw new TaskException("Meet error in task." + Thread.currentThread().getName()); // 1个摹拟耗时的操作 for (int i = 999999999; i > 0; i--) ; return "call()方法被自动调用,任务的结果是:" + id + " " + Thread.currentThread().getName(); } } class TaskException extends Exception { public TaskException(String message) { super(message); } }

call()方法被自动调用,干活!!! pool⑴-thread⑵ call()方法被自动调用,干活!!! pool⑴-thread⑶ call()方法被自动调用,干活!!! pool⑴-thread⑴ call()方法被自动调用,干活!!! pool⑴-thread⑹ call()方法被自动调用,干活!!! pool⑴-thread⑷ call()方法被自动调用,干活!!! pool⑴-thread⑸ call()方法被自动调用,干活!!! pool⑴-thread⑼ call()方法被自动调用,干活!!! pool⑴-thread⑴0 call()方法被自动调用,干活!!! pool⑴-thread⑻ call()方法被自动调用,干活!!! pool⑴-thread⑺ java.util.concurrent.ExecutionException: threadTest.TaskException: Meet error in task.pool⑴-thread⑴ at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at threadTest.ExecutorServiceTest.main(ExecutorServiceTest.java:29) Caused by: threadTest.TaskException: Meet error in task.pool⑴-thread⑴ at threadTest.TaskWithResult.call(ExecutorServiceTest.java:57) at threadTest.TaskWithResult.call(ExecutorServiceTest.java:41) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

