什么是线程池

官方文档描述

1
2
An {@link ExecutorService} that executes each submitted task using one of possibly 
several pooled threads, normally configured using {@link Executors} factory methods.

大体意思是ExecutorService借助池中线程来处理每一个提交的任务,通常使用Executors的工厂方法来配置线程池。

为什么要用线程池

线程池解决了两个问题:

  • 执行大量的异步任务时,使用线程池减少每个任务的调用开销,提高性能

执行一个线程任务的流程:创建线程、执行线程中的任务、销毁线程。当执行大量的异步任务时,频繁的创建和销毁线程的时间占据大部分比例,使用线程池技术维护核心和最大的线程池数来避免频繁的创建和销毁线程所占据的时间,提高性能。

  • 执行任务集合时,线程池提供一种限制和管理资源的方案

执行任务集合时,由于处理器单元是有限的,线程池技术提供核心线程池、最大线程池限制线程可同时运行的线程个数,减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

ThreadPoolExecutor、AbstractExecutorService、ExecutorService、Executor之间的关系

通过下面代码可以很清楚了解到他们之间的关系,图我就不花了。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadPoolExecutor extends AbstractExecutorService 
public abstract class AbstractExecutorService implements ExecutorService
public interface ExecutorService extends Executor {
void shutdown(); // 关闭所有任务
List<Runnable> shutdownNow(); // 立即关闭所有任务
boolean isShutdown(); // 是否被关闭
boolean isTerminated(); // 是否被终止运行
Future<?> submit(Runnable task); // 提交任务到线程池中运行,最后还是交于execute方法执行
....
}
public interface Executor {
void execute(Runnable command); // 执行任务
}

ThreadPoolExecutor类构造函数参数分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
....
}
  • corePoolSize:保持在线程池中的线程线程数,除非调用allowCoreThreadTimeOut方法,否则该线程闲置也不会被销毁
  • maximumPoolSize:线程池中允许创建的最大线程数(包含核心线程池)
  • keepAliveTime:当线程池中线程创建数大于核心线程数,没有新任务可以被处理而闲置keepAliveTime时间时自动销毁该类型线程
  • unit:keepAliveTime的时间单位
  • workQueue:工作队列,这个队列将维持被执行的任务
  • threadFactory:线程工程,用于创建新的线程
  • handler:当线程任务被拒时的处理者

线程池使用demo

下面例子是线程池的简单用法,不具备代表性。但是,确是比较有意思的一个例子,Callable和Future搭配可以实现等待多个任务完成后统一处理。有一些很鸡肋的场景,例如通过多个接口获取主页不同类型数据后刷新界面,哎,主要是服务端写成了多个接口,我也是很无奈╮(╯▽╰)╭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "callable sleep 1s";
}
};
Callable<String> callable1 = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "callable sleep 2s";
}
};
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("I'm runnable. I'm finished.");
}
});
Future<String> submit = executorService.submit(callable);
Future<String> submit1 = executorService.submit(callable1);
String s = submit.get();
String s1 = submit1.get();
System.out.println("callable resule: s=>" + s + " s1=>" + s1);
}
}

输出:
I'm runnable. I'm finished.
callable resule: s=>callable sleep 1s s1=>callable sleep 2s

线程池钩子方法

1
2
3
4
5
6
7
8
9
10
/**
* <dt>Hook methods</dt>
*
* <dd>This class provides {@code protected} overridable
* {@link #beforeExecute(Thread, Runnable)} and
* {@link #afterExecute(Runnable, Throwable)} methods that are called
* before and after execution of each task. These can be used to
* manipulate the execution environment; for example, reinitializing
* ThreadLocals, gathering statistics, or adding log entries.
*/

主要的意思是系统提供两个钩子方法beforeExecuteafterExecute方法,他们分别是在任务执行之前和执行之后调用。用途,比如重新初始化线程变量收集静态数据添加日志实体等。下面是官方的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();

public PausableThreadPoolExecutor(...) { super(...); }

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}

public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}

public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}

本文对于线程池的讲解比较浅显,只要是讲解线程池的基础使用



# Java,线程池  

tocToc: