JAVA线程池的实现

2019/06/13 JAVA

JAVA中的线程池实现

在JDK1.5之后,JAVA增加了线程池的实现.这里简要描述一下相关的两个类Executors以及ThreadPoolExecutor,注意: 该部分内容是结合了互联网上的相关知识.

Java Executors

Java里面线程池的顶级接口是Executors,但是严格意义上讲Executors并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

比较重要的几个类:

类名 描述
ExecutorService 真正的线程池接口
ScheduledExecutorService 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题
ThreadPoolExecutor ExecutorService的默认实现
ScheduledThreadPoolExecutor 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
这四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

示例代码如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
    public static void main(String[] args) {
        // 注意当前创建根据不同的需要,我们后面针对不同的线程池实现做修改
        ExecutorService pool = Executors.newCachedThreadPool();
        // 创建线程
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();
        // 将线程放入池中进行执行
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        // 关闭线程池
        pool.shutdown();
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "     正在执行。。。");
    }
}

newCachedThreadPool

执行结果如下

pool-1-thread-1     正在执行。。。
pool-1-thread-2     正在执行。。。
pool-1-thread-2     正在执行。。。
pool-1-thread-3     正在执行。。。
pool-1-thread-1     正在执行。。。

Process finished with exit code 0
newFixedThreadPool
  1. 修改对应代码
    ExecutorService pool = Executors.newFixedThreadPool(5);
    

    此时对应的结果如下

pool-1-thread-2     正在执行。。。
pool-1-thread-1     正在执行。。。
pool-1-thread-3     正在执行。。。
pool-1-thread-4     正在执行。。。
pool-1-thread-5     正在执行。。。

Process finished with exit code 0
  1. 修改对应代码
    ExecutorService pool = Executors.newFixedThreadPool(2);
    

    此时执行的结果如下

pool-1-thread-1     正在执行。。。
pool-1-thread-2     正在执行。。。
pool-1-thread-1     正在执行。。。
pool-1-thread-1     正在执行。。。
pool-1-thread-2     正在执行。。。

Process finished with exit code 0
newSingleThreadExecutor

修改代码为

ExecutorService pool = Executors.newSingleThreadExecutor();

执行结果

pool-1-thread-1     正在执行。。。
pool-1-thread-1     正在执行。。。
pool-1-thread-1     正在执行。。。
pool-1-thread-1     正在执行。。。
pool-1-thread-1     正在执行。。。

Process finished with exit code 0
newScheduledThreadPool

修改代码为

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) {
        // 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        // 将线程放入池中进行执行
        pool.execute(t1);
        // 使用延迟执行风格的方法
        pool.schedule(t2, 1000, TimeUnit.MILLISECONDS);
        pool.schedule(t3, 10, TimeUnit.MILLISECONDS);

        // 关闭线程池
        pool.shutdown();
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "     正在执行。。。");
    }
}

执行结果

pool-1-thread-1     正在执行。。。
pool-1-thread-2     正在执行。。。
pool-1-thread-1     正在执行。。。

Process finished with exit code 0

无论以上创建那种线程池 必须要调用ThreadPoolExecutor,以下附上对应的静态方法实现代码

public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
	return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>()));
}

// 该类随未直接new ThreadPoolExecutor类,但ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,new ScheduledThreadPoolExecutor实际上调用的即为ThreadPoolExecutor的构造器
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

但阿里的JAVA开发手册不建议直接使用Executors的四种静态方法

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端: 1)newFixedThreadPool和newSingleThreadExecutor:   主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。 2)newCachedThreadPool和newScheduledThreadPool:   主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

那么我们来看一下ThreadPoolExecutor的实现

ThreadPoolExecutor

java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, 
BlockingQueue workQueue, 
RejectedExecutionHandler handler) 

corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

也就是:处理任务的优先级为: 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性: NANOSECONDSMICROSECONDSMILLISECONDSSECONDS

workQueue常用的是:java.util.concurrent.ArrayBlockingQueue
Java并发包中的阻塞队列一共7个,当然他们都是线程安全的。

名称 描述
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列
DealyQueue 一个使用优先级队列实现的无界阻塞队列
SynchronousQueue 一个不存储元素的阻塞队列
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列

handler有四个选择:

名称 描述
ThreadPoolExecutor.AbortPolicy() 丢弃任务并抛出RejectedExecutionException异常。 (默认)
ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务,重新尝试执行
ThreadPoolExecutor.DiscardPolicy() 丢弃当前任务,但是不抛出异常。

Search

    欢迎关注我的微信公众号

    Yannis的程序人生

    Table of Contents