程序员人生 网站导航

线程池(二)

栏目:php教程时间:2016-07-01 08:58:45

线程池超负载了怎样办?都有哪些谢绝策略?

在ThreadPoolExecutor的构造方法里有1个这样的参数,RejectedExecutionHandler handler通过查看Jdk我们可以知道这是1个接口,而且jdk内置实现了4种谢绝策略,它们都是ThreadPoolExecutor的public static class。

CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被抛弃的任务,虽然这样其实不会真的抛弃任务,但是相应的被提交任务的线程性能肯定会急剧降落。
AbortPolicy策略:该策略会直接抛出异常,禁止系统正常工作。
DiscardPolicy策略:直接抛弃任务,不给予任何处理。
DiscardOldestPolicy策略:该策略会抛弃最古老的要求(也就是任务队列中最早进入的任务),行将被履行的策略,并尝试再次提交当前任务。
下面可以用1个小Demo来演示1下:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ExecutorServiceDemo { static void log(String msg) { System.out.println(System.currentTimeMillis() + " -> " + msg); } public static void main(String[] args) throws Exception { ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 0; i < 10; i++) { final int index = i; pool.submit(new Runnable() { public void run() { log("run task:" + index + " -> " + Thread.currentThread().getName()); try { Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); } log("run over:" + index + " -> " + Thread.currentThread().getName()); } }); } log("before sleep"); Thread.sleep(4000L); log("before shutdown()"); pool.shutdown(); log("after shutdown(),pool.isTerminated=" + pool.isTerminated()); pool.awaitTermination(1, TimeUnit.SECONDS); log("now,pool.isTerminated=" + pool.isTerminated()); } }
程序设置的线程池只有1个线程,并且允许最大的线程数量也为1,同时任务队列的最大界限为1。
当采取DisCardOldestPolicy()时:

可以知道在workQueue中的task1-task81直被抛弃(由于任务队列只有1个容量,而线程池里的唯1线程处理的速度不是很快,而程序又不停的往线程池里提交任务)。直到最后1个task9任务才被履行。
当采取DisCardPolicy()时:

可以看见刚除开始线程履行的task0和任务队列里的task1,其余的都被默默地抛弃了。
当采取AbortPolicy()策略时:

系统直接抛出异常。。。
当采取CallerRunsPolicy()策略时就有点不同的了:

可以看见在main线程1直帮忙处理不能被线程池处理同时也不能进入任务队列的任务。
若以上的策略还是没法满足实际利用的需要。我们还可以自己扩大RejectedExecutionHandler {
void rejectedExecution(Runnable r,ThreadPoolExecutor executor);
}   
在ThreadPoolExecutor的Execute代码里可以看见当由于超越其界限而没有更多可用的线程或队列槽时,或关闭 Executor 时便可能产生这类情况
 final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
 }

自己定义线程创建:ThreadFactory

看到这里我们可能有疑问那就是线程池的线程是从哪里来的?答案就是:ThreadFactory
它是1个接口,只有1个方法
public Thread newThread(Runnable r);
自定义线程池可以更加自由的设置线程的状态,下面任性的设置线程全为后台线程:
import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 自定义线程池 :public interface ThreadFactory * 根据需要创建新线程的对象 * Thread newThread(Runnable r) * @author Administrator * */ public class MyThreadFactory { public static class MyTask implements Runnable { @Override public void run() { System.out.println( System.currentTimeMillis() + " Thread id:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService exec = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread t = new Thread(r,"后台线程"); t.setDaemon(true); System.out.println("create " + t.getName()+ " " + t.getId()); return t; } }); for(int i = 0; i < 5; i++) { exec.submit(task); } TimeUnit.SECONDS.sleep(2); } }

对线程池的扩大

上面仅仅只是我们自定义了创建线程时的状态,但是有时候,我们需要对线程履行的任务进行监控,比如说任务的开始时间和结束时间。荣幸的是,ThreadPoolExecutor是1个可扩大的线程池。提供了3个方法:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
同时再看ThreadPoolExecutor中有1个这样的类, private final class Worker extends AbstractQueuedSynchronizer implements Runnable它里面有1个方法runWorker
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task);<span style="white-space:pre"> </span>//运行前 Throwable thrown = null; try { task.run();<span style="white-space:pre"> </span>//运行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);<span style="white-space:pre"> </span>//运行结束后 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
ThreadPoolExecutor中工作线程正是Worker的实例(它是把Runnable对象进行了包装),Worker.runWorker()会被线程池以多线程模式异步调用,即它会被多个线程访问,因此其beforeExecute()它们也会被多线程同时访问。
在默许的ThreadExecutor实现中,提供了beforeExecute(),afterExecute(),terminated()空的实现。在实际的利用中可以通过对其进行扩大实现多线程池运行状态的跟踪。
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ExtThreadPool { public static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在履行线程id:" + Thread.currentThread().getId() + " Task name " + name); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) //直接取得ThreadPoolExecutor的子类,并且重写protect钩子方法 { @Override protected synchronized void beforeExecute(Thread t, Runnable r) { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("准备履行:" + ((MyTask)r).name); } @Override protected synchronized void afterExecute(Runnable r, Throwable t) { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("履行完成:" + ((MyTask)r).name); } @Override protected synchronized void terminated() { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("线程池退出..."); } }; for(int i = 0; i < 5; i++) { MyTask task = new MyTask("Task" + i); exec.execute(task); Thread.sleep(10); } exec.shutdown(); } }
这里需要注意打印时间这句会产生线程安全。用Synchronized监视器让线程在临界区进行互斥的履行。
运行结果:


这里可以清楚地看到线程池中履行任务的创建和结束信息。

在线程池中寻觅堆栈异常信息

import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 演示线程池中的异常 * @author Administrator * */ public class DivTask implements Runnable { int a,b; public DivTask(int a, int b) { super(); this.a = a; this.b = b; } @Override public void run() { double res = a/b; System.out.println(res); } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) { es.execute(new DivTask(100, i)); } es.shutdown(); } }
运行结果有异常抛出:
Exception in thread "pool⑴-thread⑴" java.lang.ArithmeticException: / by zero at ch3.DivTask.run(DivTask.java:25) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 50.0 33.0 25.0 100.0



但是上面的异常栈的信息还不够全,最少我们不知道是任务到底在哪里提交的?这时候候采取可以对线程池进行扩大。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable command) { super.execute(wrap(command,clientStack(),Thread.currentThread().getName())); } @Override public Future<?> submit(Runnable task) { return super.submit(wrap(task,clientStack(),Thread.currentThread().getName())); } public Exception clientStack() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) { return new Runnable() { @Override public void run() { try { task.run(); } catch(Exception e) { clientStack.printStackTrace(); throw e; } } }; } }
使用这个扩大的线程池运行的结果:
100.0 25.0 33.0 50.0 java.lang.Exception: Client stack trace at DivTask$TraceThreadPoolExecutor.clientStack(DivTask.java:53) at DivTask$TraceThreadPoolExecutor.execute(DivTask.java:46) at DivTask.main(DivTask.java:75) Exception in thread "pool⑴-thread⑴" java.lang.ArithmeticException: / by zero at DivTask.run(DivTask.java:32) at DivTask$TraceThreadPoolExecutor$1.run(DivTask.java:61) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)


这样就能够看到完全的堆栈信息,对毛病的定位也就更准确了。
不过,有时候使用submit取得异常信息更加方便。jdk的定义:Callable 接口类似于Runnable,二者都是为那些其实例可能被另外一个线程履行的类设计的。但是Runnable 不会返回结果,并且没法抛出经过检查的异常。
这篇博客介绍关于submit和execute的区分,地址:http://blog.csdn.net/peachpi/article/details/6771946#

3个区分:

1、接收的参数不1样

2、submit有返回值,而execute没有

Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion. 

用到返回值的例子,比如说我有很多个做validation的task,我希望所有的task履行完,然后每一个task告知我它的履行结果,是成功还是失败,如果是失败,缘由是甚么。然后我就能够把所有失败的缘由综合起来发给调用者。

个人觉得cancel execution这个用途不大,很少有需要去取消履行的。

而最大的用途应当是第2点。

3、submit方便Exception处理

There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don't have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task's return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.

意思就是如果你在你的task里会抛出checked或unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那末就需要用到submit,通过捕获Future.get抛出的异常。

比如说,我有很多更新各种数据的task,我希望如果其中1个task失败,其它的task就不需要履行了。那我就需要catch Future.get抛出的异常,然后终止其它task的履行,代码以下:

import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); // 创建10个任务并履行 for (int i = 0; i < 10; i++) { // 使用ExecutorService履行Callable类型的任务,并将结果保存在future变量中 Future<String> future = executorService.submit(new TaskWithResult(i)); // 将任务履行结果存储到List中 resultList.add(future); } executorService.shutdown(); // 遍历任务的结果 for (Future<String> fs : resultList) { try { System.out.println(fs.get()); // 打印各个线程(任务)履行的结果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { executorService.shutdownNow(); e.printStackTrace(); return; } } } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } /** * 任务的具体进程,1旦任务传给ExecutorService的submit方法,则该方法自动在1个线程上履行。 * * @return * @throws Exception */ public String call() throws Exception { System.out.println("call()方法被自动调用,干活!!! " + Thread.currentThread().getName()); if (new Random().nextBoolean()) throw new TaskException("Meet error in task." + Thread.currentThread().getName()); // 1个摹拟耗时的操作 for (int i = 999999999; i > 0; i--) ; return "call()方法被自动调用,任务的结果是:" + id + " " + Thread.currentThread().getName(); } } class TaskException extends Exception { public TaskException(String message) { super(message); } }
上面的代码解释的也比较详细了。。。。运行的结果和我们在上面自己扩大的1样都是完全的栈信息。

call()方法被自动调用,干活!!! pool⑴-thread⑵ call()方法被自动调用,干活!!! pool⑴-thread⑶ call()方法被自动调用,干活!!! pool⑴-thread⑴ call()方法被自动调用,干活!!! pool⑴-thread⑹ call()方法被自动调用,干活!!! pool⑴-thread⑷ call()方法被自动调用,干活!!! pool⑴-thread⑸ call()方法被自动调用,干活!!! pool⑴-thread⑼ call()方法被自动调用,干活!!! pool⑴-thread⑴0 call()方法被自动调用,干活!!! pool⑴-thread⑻ call()方法被自动调用,干活!!! pool⑴-thread⑺ java.util.concurrent.ExecutionException: threadTest.TaskException: Meet error in task.pool⑴-thread⑴ at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at threadTest.ExecutorServiceTest.main(ExecutorServiceTest.java:29) Caused by: threadTest.TaskException: Meet error in task.pool⑴-thread⑴ at threadTest.TaskWithResult.call(ExecutorServiceTest.java:57) at threadTest.TaskWithResult.call(ExecutorServiceTest.java:41) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐