Java Executor源碼解析(7)—Executors線程池工廠以及四大內置線程池

語言: CN / TW / HK

持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的第9天,點擊查看活動詳情

詳細介紹了Executors線程池工具類的使用,以及四大內置線程池。

系列文章:

  1. Java Executor源碼解析(1)—Executor執行框架的概述
  2. Java Executor源碼解析(2)—ThreadPoolExecutor線程池的介紹和基本屬性【一萬字】
  3. Java Executor源碼解析(3)—ThreadPoolExecutor線程池execute核心方法源碼【一萬字】
  4. Java Executor源碼解析(4)—ThreadPoolExecutor線程池submit方法以及FutureTask源碼【一萬字】
  5. Java Executor源碼解析(5)—ThreadPoolExecutor線程池其他方法的源碼
  6. Java Executor源碼解析(6)—ScheduledThreadPoolExecutor調度線程池源碼解析【一萬字】
  7. Java Executor源碼解析(7)—Executors線程池工廠以及四大內置線程池

Executors可以看作一個工具類,裏面提供了好多靜態方法,這些方法根據用户選擇返回不同的內置線程池實例,或者返回線程工廠,或者將runnable轉換為callable。

《阿里巴巴java開發手冊》中不推薦使用Executors創建線程池,有這樣的強制編程規約:線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。

Executors 返回的線程池對象的弊端如下:

  1. FixedThreadPool 和 SignalThreadPool : 允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM。
  2. CachedThreadPool 和 ScheduledThreadPool : 允許的創建線程數量為Integer.MAX_VALUE,可能會創建大量的線程,從而導致OOM。

1 內置線程池

Executors提供了四種默認線程池實現!

1.1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)

創建一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。

java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

corePoolSize與maximumPoolSize相等,即最大線程數就是核心線程數;keepAliveTime = 0 該參數默認對核心線程無效,因此不會超時,在線程池被關閉之前,池中被創建的線程將一直存在。

workQueue為LinkedBlockingQueue,是一個無界阻塞隊列,隊列容量為Integer.MAX_VALUE。如果任務提交速度持續大餘任務處理速度,會造成隊列堆積大量任務,很有可能在執行拒絕策略之前就造成內存溢出。

適用於持續不斷地提交任務的場景,並且要求任務提交速度不得超過線程處理速度。

1.2 newCachedThreadPool

public static ExecutorService newCachedThreadPool()

創建對所有線程都帶有超時時間的線程池。對於執行很多短期異步任務的程序而言,這個線程池通常可提高程序性能。

調用execute等方法將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的(均在工作中),則創建一個新線程並添加到池中。將會終止並移除那些已有60秒鐘未工作的線程。因此,長時間保持空閒的線程池不會使用任何資源。線程池會根據執行的情況,在程序運行時自動調整線程數量,這裏就是可變線程數量的線程池的特點。

java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即沒有核心線程,線程數量最大為Integer. MAX_VALUE;keepAliveTime = 60s,對所有的線程空閒60s後清理。

workQueue 為 SynchronousQueue 阻塞同步隊列,該隊列沒有容量,因此如果有新的任務進來並且目前的線程都在工作中,那麼會立即創建新線程執行任務;

適用於快速處理大量耗時較短的任務,如果任務耗時較長,極端情況下CachedThreadPool會因為創建過多線程而耗盡CPU和內存資源。

1.3 newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

創建一個線程池,它可安排在給定延遲後運行命令或者定期地執行。內部就是一個ScheduledThreadPoolExecutor實例!

java public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

1.4 newSignalThreadExecutor

public static ExecutorService newSingleThreadExecutor()

返回只有固定一個線程的線程池!

這裏多了一層FinalizableDelegatedExecutorService包裝,這一層有什麼用呢?它和newFixedThreadPool“固定容量”的線程池有什麼區別呢?是否可以輕易的改變線程數量呢?寫個demo來測試一下:

```java /* * @author lx / public class FixedThreadPoolTest { public static void main(String[] args) { //固定容量的線程池,一個線程 ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1); //強轉 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService; System.out.println(threadPoolExecutor.getCorePoolSize()); //重設核心線程 threadPoolExecutor.setCorePoolSize(8); System.out.println(threadPoolExecutor.getCorePoolSize()); //可以看到,上面的所謂"固定容量"的線程池,雖然只有一個容量,但是可以輕易修改容量只需要強制轉型;

    //下面的單個線程的線程池,由於外層包裝了FinalizableDelegatedExecutorService
    //它是Executors的內部類,不能強轉為ThreadPoolExecutor,因此不能改變大小1,做到真正的Single.
    ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
    //下面報運行時異常 java.lang.ClassCastException
    ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
}

} ```

為什麼不能強轉?原理很簡單,FinalizableDelegatedExecutorService並沒有和ThreadPoolExecutor產生繼承關係。

```java /* * FinalizableDelegatedExecutorService作為DelegatedExecutorService的子類 / static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); }

protected void finalize() {
    super.shutdown();
}
//......

}

/* * DelegatedExecutorService作為AbstractExecutorService的子類 * 可以看到這兩個類和ThreadPoolExecutor沒有關係,內部也並不是ThreadPoolExecutor實現的 * 而setCorePoolSize的方法只有ThreadPoolExecutor類存在,因此如果強轉會拋出ClassCastException / static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e;

DelegatedExecutorService(ExecutorService executor) {
    e = executor;
}
//......

} ```

2 默認線程工廠

public static ThreadFactory defaultThreadFactory()

Executors提供了一個默認的線程工廠實現!線程池中線程的默認名字的由來以及線程所屬線程組等屬性都是通過線程工廠設置的!

```java public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } /* * DefaultThreadFactory 是默認的線程工廠,使用Executors.defaultThreadFactory()獲取 / static class DefaultThreadFactory implements ThreadFactory { //靜態的原子變量, 用來統計線程工廠的個數。是static 的原子變量,用來記錄當前線程池的編號,它是應用級別的,所有線程池共用一個,比如創建第一個線程池時線程池編號為l ,創建第二個線程池時線程池的編號為2,所以pool-1-thread-1 裏面的pool -1 中的l 就是這個值。 private static final AtomicInteger poolNumber = new AtomicInteger(1); //線程組 private final ThreadGroup group; //threadNumber 用來記錄每個線程工廠創建了多少線程, 這兩個值也作為線程池和線程的名稱的一部分。線程池級別的,每個線程池使用該變量來記錄該線程池中線程的編號, 所以pool- 1-thread- l 裏面的thread-I 中的l 就是這個值。 private final AtomicInteger threadNumber = new AtomicInteger(1); //線程名字的前綴,線程池中線程名稱的前綴,默認固定為pool o private final String namePrefix;

DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
}

/**
 * newThread 方法是對線程的一個修飾
 * @param r
 * @return
 */
public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
            //線程名字由來
            namePrefix + threadNumber.getAndIncrement(),
            0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

}

```

3 Runnable轉換為Callable

public static < T > Callable< T > callable(Runnable task, T result)

返回 Callable 對象,調用它時可運行給定的任務並返回給定的結果。這在把需要 Callable 的方法應用到其他無結果的操作時很有用。

public static Callable< Object > callable(Runnable task)

返回 Callable 對象,調用它時可運行給定的任務並返回 null。

原理很簡單,就是適配器模式的應用,返回的是一個RunnableAdapter適配器類的實例。

```java /* * 返回 Callable 對象,調用它時可運行給定的任務並返回 null。 * * @param task 被適配的Runnable任務 * @return 一個Callable任務 * @throws NullPointerException 如果task為inull / public static Callable callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, null); }

/* * 返回 Callable 對象,調用它時可運行給定的任務並返回給定的結果。 * 這在把需要 Callable 的方法應用到其他無結果的操作時很有用。 * * @param task 被適配的Runnable任務 * @param result 指定返回值 * @return 一個Callable任務 * @throws NullPointerException 如果task為inull / public static Callable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, result); }

/* * 該類是Executors的內部類,實現了callable接口 * 作為一個適配器類,用於將runnable任務和result結果適配成Callable對象 / static final class RunnableAdapter implements Callable { final Runnable task; final T result;

RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
}

/**
 * 重寫了call方法
 * 適配原理很簡單,調用task.run(),並且返回result
 *
 * @return result
 */
public T call() {
    task.run();
    return result;
}

} ```

4 總結

本系列文章我們學習了Executor執行框架的大概架構,以及ThreadPoolExecutor、FutureTask、ScheduledThreadPoolExecutor 、Executors等核心類的原理。閲讀線程池的實現源碼可以讓我們不僅知道某個方法是什麼,還能知道為什麼,以及怎麼實現的,在這過程中我們還能接觸到比如適配器模式,策略模式、Leader-Follower線程模型等一些偏理論的東西的具體應用。

如果你讀完本系列文章,你應該可以知道一些關於線程池的不常見的知識,比如:

  1. 線程池的最大線程數量一定會小於等於maximumPoolSize嗎?為什麼?
  2. 核心線程可以應用keepAliveTime設置的超時時間嗎?有例外嗎?
  3. 線程池有用到鎖嗎?有幾種鎖?有什麼用?
  4. 如果一個工作線程在執行任務過程中拋出了異常,那麼這個線程會怎樣呢?
  5. 延遲/週期任務的原理是什麼?scheduleWithFixedDelay和scheduleWithFixedDelay 方法的區別是什麼?
  6. 正常線程池中設置的延遲任務一定會在到達你設置的延遲時間之時運行嗎?
  7. FutureTask的原理?

Executor執行框架類容很豐富,功能很多,本次僅僅講解了一部分,還有一些包括ThreadPoolExecutor、ScheduledThreadPoolExecutor的某些方法也沒有講解,使用時建議查看相關api文檔做更全面的瞭解!另外JDK1.7的時候線程池新增了ForkJoinPool分治框架,這是對線程池的增強,後面的文章我們會講解ForkJoinPool的源碼!

相關文章:

  1. AQS:JUC—五萬字的AbstractQueuedSynchronizer(AQS)源碼深度解析與應用案例
  2. interrupt:Java線程中斷與停止線程詳解以及案例演示
  3. ReentrantLock:JUC—ReentrantLock源碼深度解析
  4. PriorityBlockingQueue:JUC—兩萬字的PriorityBlockingQueue源碼深度解析
  5. DelayQueue:JUC—DelayQueue源碼深度解析
  6. LockSupport:JUC—LockSupport以及park、unpark方法底層源碼深度解析

其他JUC相關文章比如阻塞隊列、鎖等等,在我的專欄中都有!

如有需要交流,或者文章有誤,請直接留言。另外希望點贊、收藏、關注,我將不間斷更新各種Java學習博客!