如何优雅的使用线程池

如何优雅的使用线程池

Posted by 不学无数 on December 17, 2019

线程池不仅在项目中是非常常用的一项技术而且在面试中基本上也是必问的知识点,接下来跟着我一起来巩固一下线程池的相关知识。在了解线程池之前我们先了解一下什么是进程什么是线程

进程

  • 程序:一般是一组CPU指令的集合构成的文件,静态存储在诸如硬盘之类的存储设备上
  • 进程:当一个程序要被计算机运行时,就是在内存中产生该程序的一个运行时实例,我们就把这个实例叫做进程

用户下达运行程序的命令以后,就会产生一个进程,同一个程序可以产生多个进程(一对多的关系),以允许同时有多个用户运行同一个程序,却不会相冲突。

进程需要一些资源才能工作,如CPU的使用时间、存储器、文件、以及I/O设备,且为依序逐一执行,也就是每个CPU核心任何时间内仅能运行一项进程。但是在一个应用程序中一般不会是只有一个任务单条线执行下去,肯定会有多个任务,而创建进程又是耗费时间和资源的,称之为重量级操作。

  1. 创建进程占用资源太多
  2. 进程之间的通信需要数据在不同的内存空间传来传去,所以进程间通信会更加耗费时间和资源

线程

线程是操作系统能够进行运算调度的最小单位,大部分情况下它被包含在进程之中,是进程中实际的运作单位。一个进程可以并发多个线程,每个线程执行不同的任务。同一个进程中的多条线程共享该进程中的全部虚拟资源,例如虚拟地址空间、文件描述符、信号处理等等。但是同一个进程中的多个线程各自有各自的调用栈。

一个进程可以有很多线程,每条线程并行执行不同的任务。

线程中的数据

  1. 线程栈上的本地数据:比如函数执行过程的局部变量,我们知道在Java中线程模型是使用栈的模型。每个线程都有自己的栈空间。
  2. 在整个进程里共享的全局数据:我们知道在Java程序中,Java就是一个进程,我们可以通过ps -ef | grep java可以看到在程序中运行了多少个Java进程,例如我们Java中的全局变量,在不同进程之间是隔离的,但是在线程之间是共享的。
  3. 线程的私有数据:在Java中我们可以通过ThreadLocal来创建线程间私有的数据变量。

线程栈上的本地数据只能在本方法内有效,而线程的私有数据是在线程间多个函数共享的。

CPU密集型和IO密集型

理解是服务器是CPU密集型还是IO密集型能够帮助我们更好的设置线程池中的参数。具体如何设置我们在后面讲到线程池的时候再分析,这里大家先知道这两个概念。

  • IO密集型:大部分时间CPU闲着,在等待磁盘的IO操作
  • CPU(计算)密集型:大部分时间磁盘IO闲着,等着CPU的计算操作

线程池

线程池其实是池化技术的应用一种,常见的池化技术还有很多,例如数据库的连接池、Java中的内存池、常量池等等。而为什么会有池化技术呢?程序的运行本质,就是通过使用系统资源(CPU、内存、网络、磁盘等等)来完成信息的处理,比如在JVM中创建一个对象实例需要消耗CPU的和内存资源,如果你的程序需要频繁创建大量的对象,并且这些对象的存活时间短就意味着需要进行频繁销毁,那么很有可能这段代码就成为了性能的瓶颈。总结下来其实就以下几点。

  • 复用相同的资源,减少浪费,减少新建和销毁的成本;
  • 减少单独管理的成本,统一交由”池”;
  • 集中管理,减少”碎片”;
  • 提高系统响应速度,因为池中有现成的资源,不用重新去创建;

所以池化技术就是为了解决我们这些问题的,简单来说,线程池就是将用过的对象保存起来,等下一次需要这个对象的时候,直接从对象池中拿出来重复使用,避免频繁的创建和销毁。在Java中万物皆对象,那么线程也是一个对象,Java线程是对于操作系统线程的封装,创建Java线程也需要消耗操作系统的资源,因此就有了线程池。但是我们该如何创建呢?

Java提供的四种线程池

Java为我们提供了四种创建线程池的方法。

  • Executors.newCachedThreadPool:创建可缓存无限制数量的线程池,如果线程中没有空闲线程池的话此时再来任务会新建线程,如果超过60秒此线程无用,那么就会将此线程销毁。简单来说就是忙不来的时候无限制创建临时线程,闲下来再回收

      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
    	
    
  • Executors.newFixedThreadPool:创建固定大小的线程池,可控制线程最大的并发数,超出的线程会在队列中等待。简单来说就是忙不来的时候会将任务放到无限长度的队列里。

     	public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
    	
    
  • Executors.newSingleThreadExecutor:创建线程池中线程数量为1的线程池,用唯一线程来执行任务,保证任务是按照指定顺序执行

      public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
    	
    
  • Executors.newScheduledThreadPool:创建固定大小的线程池,支持定时及周期性的任务执行

      public ScheduledThreadPoolExecutor(int corePoolSize) {
          super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                new DelayedWorkQueue());
      }
    	
    

线程池的创建原理

我们点击去这四种实现方式的源码中我们可以看到其实它们的底层创建原理都是一样的,只不过是所传的参数不同组成的四个不同类型的线程池。都是使用了ThreadPoolExecutor来创建的。我们可以看一下ThreadPoolExecutor 创建所传的参数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

那么这些参数都具体代表什么意思呢?

  • corePoolSize :线程池中核心线程数的数量
  • maximumPoolSize :在线程池中允许存在的最大线程数
  • keepAliveTime :当存在的线程数大于corePoolSize ,那么会找到空闲线程去销毁,此参数是设置空闲多久的线程才被销毁。
  • unit :时间单位
  • workQueue :工作队列,线程池中的当前线程数大于核心线程的话,那么接下来的任务会放入到队列中
  • threadFactory :在创建线程的时候,通过工厂模式来生产线程。这个参数就是设置我们自定义的线程创建工厂。
  • handler :如果超过了最大线程数,那么就会执行我们设置的拒绝策略

接下来我们将这些参数合起来看一下他们的处理逻辑是什么。

  1. corePoolSize 个任务时,来一个任务就创建一个线程
  2. 如果当前线程池的线程数大于了corePoolSize 那么接下来再来的任务就会放入到我们上面设置的workQueue 队列中
  3. 如果此时workQueue 也满了,那么再来任务时,就会新建临时线程,那么此时如果我们设置了keepAliveTime 或者设置了allowCoreThreadTimeOut ,那么系统就会进行线程的活性检查,一旦超时便销毁线程
  4. 如果此时线程池中的当前线程大于了maximumPoolSize 最大线程数,那么就会执行我们刚才设置的handler 拒绝策略

为什么建议不用Java提供的线程池创建方法

理解了上面设置的几个参数以后,我们再来看一下为什么在《阿里巴巴Java手册》中有一条这样规定。

相信大家看到上面提供四种创建线程池的实现原理,应该知道为什么阿里巴巴会有这么规定了。

  • FixedThreadPoolSingleThreadExecutor:这两个线程池的实现方式,我们可以看到它设置的工作队列都是LinkedBlockingQueue,我们知道此队列是一个链表形式的队列,此队列是没有长度限制的,是一个无界队列,那么此时如果有大量请求,就有可能造成OOM
  • CachedThreadPoolScheduledThreadPool:这两个线程池的实现方式,我们可以看到它设置的最大线程数都是Integer.MAX_VALUE,那么就相当于允许创建的线程数量为Integer.MAX_VALUE。此时如果有大量请求来的时候也有可能造成OOM

如何设置参数

所以我们在项目中如果要使用线程池的话,那么就推荐根据自己项目和机器的情况进行个性化创建线程池。那么这些参数如何设置呢?为了正确的定制线程池的长度,需要理解你的计算机配置、所需资源的情况以及任务的特性。比如部署的计算机安装了多少个CPU?多少的内存?任务主要执行是IO密集型还是CPU密集型?所执行任务是否需要数据库连接这样的稀缺资源?

如果你有多个不同类别的任务,它们的行为有很大的差别,那么应该考虑使用多个线程池。这样也能根据每个任务不同定制不同的线程池,也不至于因为一种类型的任务失败而托垮另一个任务。

  • CPU密集型任务:说明包含了大量的运算操作,比如有N个CPU,那么就配置线程池的容量大小为N+1,这样能获得最优的利用率。因为CPU密集型的线程恰好在某时因为发生一个页错误或者因为其他的原因而暂停,刚好有一个额外的线程,可以确保在这种情况下CPU周期不会中断工作。
  • IO密集任务:说明CPU大部分时间都是在等待IO的阻塞操作,那么此时就可以将线程池的容量大小配置的大一些。此时可以根据一些参数进行计算大概你的线程池的数量多少合适。

    • N:CPU的数量
    • U:目标CPU的使用率,0<=U<=1
    • W/C:等待时间与计算时间的比率
    • 那么最优的池的大小就是N*U*(1+W/C)

页缺失(英语:Page fault,又名硬错误、硬中断、分页错误、寻页缺失、缺页中断、页故障等)指的是当软件试图访问已映射在虚拟地址空间中,但是当前并未被加载在物理内存中的一个分页时,由中央处理器的内存管理单元所发出的中断

其实线程池大小的设置还是要根据自己业务类型来设置,比如当前任务需要池化的资源的时候,比如数据库的连接池,俺么线程池的长度和资源池的长度会相互的影响。如果每一个任务都需要一个数据库连接,那么连接池的大小就会限制了线程池的有效大小,类似的,当线程池中的任务是连接池的唯一消费者时,那么线程池的大小反而又会限制了连接池的有效大小。

线程池中的线程销毁

线程池的核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程的存活时间(keepAliveTime)共同管理的线程的创建与销毁。接下来我们再复习一下线程池是如何创建和销毁线程的

  • 当前线程数 < 核心线程数:来一个任务创建一个线程
  • 当前线程数 = 核心线程数:来一个任务就会将其加入到队列中
  • 当前线程数 > 核心线程数:此时有一个前提条件就是队列已满,才会新建线程,此时就会开启线程的活性检查,对于设置为keepAliveTime 时间没有活动的线程将会被回收

那么这里可能有人会想到将corePoolSize 核心线程数设置为0(如果大家还记得上面讲的CachedThreadPool 的话应该还会记得它的核心线程数就是0),因为这样设置的话线程就会动态的进行创建了,闲的时候没有线程,忙的时候再在线程池中创建线程。这样想法固然是好,但是如果我们自定义参数设置了此参数为0,而正好又设置了等待队列不是SynchronousQueue,那么其实就会有问题,因为只有在队列满的情况下才会新建线程。下面代码我使用了无界队列LinkedBlockingQueue ,其实大家看一下输出

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
for (int i = 0; i < 10; i++) {
    threadPoolExecutor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("1");
        }
    });
}

大家可以看一下演示的效果,其实1是每隔一秒打印一次,其实这就和我们使用线程池初衷相悖了,因为我们这个相当于是单线程在运行。

但是如果我们将工作队列换成SynchronousQueue 呢,我们发现这些1是一块输出出来的。

SynchronousQueue 并不是一个真正的队列,而是一种管理直接在线程间移交信息的机制,这里可以简单将其想象成一个生产者生产消息交给SynchronousQueue ,而消费者这边如果有线程来接收,那么此消息就会直接交给消费者,反之会阻塞。

所以我们在设置线程池中某些参数的时候应该想想其创建和销毁线程流程,不然我们自定义的线程池还不如使用Java提供的四种线程池了。

线程池中的拒绝策略

ThreadPoolExecutor为我们提供了四种拒绝策略,我们可以看下Java提供的四种线程池创建所提供的拒绝策略都是其定义的默认的拒绝策略。那么除了这个拒绝策略其他的拒绝策略都是什么呢?

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

我们可以到拒绝策略是一个接口RejectedExecutionHandler ,这也就意味我着我们可以自己订自己的拒绝策略,我们先看一下Java提供四种拒绝策略是什么。

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

AbortPolicy

这个拒绝策略就是Java提供的四种线程池创建方法提供的默认拒绝策略。我们可以看下它的实现。

public static class AbortPolicy implements RejectedExecutionHandler {
 
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

所以此拒绝策略就是抛RejectedExecutionException异常

CallerRunsPolicy

此拒绝策略简单来说就是将此任务交给调用者直接执行。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

这里为什么是交给了调用者来执行呢?我们可以看到它是调用了run()方法,而不是start()方法。

DiscardOldestPolicy

从源码中应该能看出来,此拒绝策略是丢弃队列中最老的任务,然后再执行。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardPolicy

从源码中应该能看出来,此拒绝策略是对于当前任务不做任何操作,简单来说就是直接丢弃了当前任务不执行。

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

线程池的拒绝策略给我们默认提供了这四种的实现方式,当然我们也能够自定义拒绝策略使线程池更加符合我们当前的业务,在后面讲解Tomcat自定义自己的线程池时也会讲解它自己实现的拒绝策略。

线程饥饿死锁

线程池为“死锁”这一概念带来了一种新的可能:线程饥饿死锁。在线程池中,如果一个任务另一个任务提交到同一个Executor,那么通常会引发死锁。第二个线程停留在工作队列中等待第一个提交的任务执行完成,但是第一个任务又无法执行完成,因为它在等待第二个任务执行完成。用图表示如下

用代码表示的话如下,这里注意我们这里定义的线程池是SingleThreadExecutor,线程池中只有一个线程,这样好模拟出这样的情况,如果在更大的线程池中,如果所有线程都在等待其他仍处于工作队列的任务而阻塞,那么这种情况被称为线程饥饿死锁。所以尽量避免在同一个线程池中处理两种不同类型的任务。

public class AboutThread {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    public static void main(String[] args) {
        AboutThread aboutThread = new AboutThread();
        aboutThread.threadDeadLock();
    }

    public void threadDeadLock(){
        Future<String> taskOne  = executorService.submit(new TaskOne());
        try {
            System.out.printf(taskOne.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public class TaskOne implements Callable{

        @Override
        public Object call() throws Exception {
            Future<String> taskTow = executorService.submit(new TaskTwo());
            return "TaskOne" + taskTow.get();
        }
    }

    public class TaskTwo implements Callable{

        @Override
        public Object call() throws Exception {
            return "TaskTwo";
        }
    }
}

拓展ThreadPoolExecutor

如果我们想要对线程池进行一些扩展,那么可以使用ThreadPoolExecutor 给我预留的一些接口可以使我们进行更深层次话的定制线程池。

线程工厂

如果我们想要给我们的线程池中的每个线程自定义一些名称,那么我们就可以使用线程工厂来实现一些自定义化的一些操作。只要我们将我们自定义的工厂传给ThreadPoolExecutor,那么无论何时线程池需要创建一个线程,都要通过我们定义的工厂来进行创建。接下来我们看一下接口ThreadFactory,只要我们实现了此接口就能自定义自己线程独有的信息。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

接下来我们可以看我们自己写的线程池工厂类

class CustomerThreadFactory implements ThreadFactory{

    private String name;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    CustomerThreadFactory(String name){
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r,name+threadNumber.getAndIncrement());
        return thread;
    }
}


只需要在进行线程池实例化的时候将此工厂类加上去即可

   public static void customerThread(){
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new SynchronousQueue<>(),
                new CustomerThreadFactory("customerThread"));

        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.printf(Thread.currentThread().getName());
                    System.out.printf("\n");
                }
            });
        }
    }

接下来我们执行此语句,发现每个线程的名字已经变了

customerThread1
customerThread10
customerThread9
customerThread8
customerThread7
customerThread6
customerThread5
customerThread4
customerThread3
customerThread2

通过继承ThreadPoolExecutor扩展

我们查看ThreadPoolExecutor 源码可以发现源码中有三个方法都是protected

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

被protected修饰的成员对于本包和其子类可见

我们可以通过继承来覆写这些方法,那么就可以进行我们独有的扩展了。执行任务的线程会调用beforeExecute afterExecute 方法,可以通过它们添加日志、时序、监视器或者同级信息收集的功能。无论任务是正常从run中返回,还是抛出一个异常,afterExecute 都会被调用(如果任务完成后抛出一个Error,则afterExecute 不会被调用)。如果beforeExecute 抛出一个RuntimeException,任务将不会被执行,afterExecute 也不会被调用。

在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后,terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者手机finalize统计等操作。

本篇文章代码地址

有感兴趣的可以关注一下我新建的公众号,搜索[程序猿的百宝袋]。或者直接扫下面的码也行。

参考