JUC并发编程第六章(共享模型之工具)


JUC并发编程 共享模型之工具

1. 线程池

1.1 ThreadPoolExecutor

线程池类图

1.1.1 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量;也就是一个整数表达了2层含义。

状态名 高3位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结(中间状态,表示线程池正在关闭中,但还没有完全终止)
TERMINATED 011 - - 终止状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING;因为最高位是符号位(0正1负);

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值:

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

1.1.2 构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

这里选取了参数最多的最全的一个

ThreadPoolExecutor 构造函数的各个参数用于配置和创建线程池,以下是每个参数的详细解释:

  1. corePoolSize(核心线程数):

    • 含义:这是线程池中保持活动的最小线程数,即使这些线程是空闲的。
    • 作用:核心线程数控制着线程池的基本大小,确保线程池至少具有指定数量的线程可用来处理任务,即使没有任务需要执行
  2. maximumPoolSize(最大线程数):

    • 含义:这是线程池中允许的最大线程数,包括核心线程和非核心线程,非核心线程用于任务量特别大,阻塞队列都存不下了,就会创建非核心线程。
    • 作用:最大线程数限制了线程池的最大并发度,当任务提交多于核心线程数的任务时,线程池会创建额外的非核心线程来处理任务,但数量不会超过最大线程数。

    核心线程和非核心线程的区别:非核心线程处理完任务之后会被销毁,相当于零时工,而核心线程不会

  3. keepAliveTime(线程空闲时间):

    • 含义:这是非核心线程在空闲状态下的最大等待时间,超过这个时间会被终止并从线程池中移除。
    • 作用:通过设置空闲线程的最大等待时间,可以限制线程池中线程的数量,以节省资源。
  4. unit(时间单位):

    • 含义:用于指定 keepAliveTime 参数的时间单位,通常是 TimeUnit.SECONDSTimeUnit.MILLISECONDS 等。

    keepAliveTime、unit用来针对非核心线程的

  5. workQueue(工作队列):

    • 含义:用于存储等待执行的任务的队列,可以是各种不同类型的队列,如 LinkedBlockingQueueArrayBlockingQueuePriorityBlockingQueue 等。
    • 作用:工作队列用于缓冲任务,当线程池中的线程都在忙于执行任务时,新任务会被放入工作队列中等待执行。
  6. threadFactory(线程工厂):

    • 含义:用于创建线程的工厂,允许你自定义线程的创建过程,例如设置线程的名称、优先级等。
    • 作用:通过自定义线程工厂,可以为线程池中的每个线程指定特定的属性,以便更好地进行线程池管理和监控。
  7. handler(拒绝策略):

    • 含义:当线程池已经达到最大线程数并且工作队列也已满时,新任务无法被执行时的处理策略。
    • 作用:拒绝策略定义了当无法接受新任务时应该采取的行动,可以选择默认的策略(抛出异常、丢弃任务、阻塞等),也可以自定义拒绝策略以满足特定需求。

工作方式:

graph LR

subgraph a[阻塞队列]
size(size=2)
t3(任务3)
t4(任务4)
end

subgraph b[线程池c=2,m=3]
ct1(核心线程1)
ct2(核心线程2)
mt1(非核心线程1)
ct1-->t1(任务1)
ct2-->t2(任务2)
mt1-->t5(任务5)
end


style ct1 fill:#bbf,stroke:#f66,stroke-width:2px
style ct2 fill:#bbf,stroke:#f66,stroke-width:2px
style mt1 fill:#bbf,stroke:#f66,stroke-width:2px

执行流程:

现在我们创建了一个线程池,其中核心线程数为2,最大线程数为3,那么非核心线程数就为1;阻塞队列空间为2。

当任务1、任务2过来的时候由于核心线程数充足,则直接交给核心线程执行;

当任务3、任务4过来的时候,由于该线程池只有两个核心线程,无法处理多余的任务,那么就会将任务3、4放入阻塞队列中进行等待,但是由于突然的任务特别多,这时候又来了个任务5,阻塞队列也无法存放未执行的任务,这时候,就会创建一个非核心线程来处理任务5,如果非核心线程处理完毕任务5之后,在最大存活时间内如果没有新的任务给到非核心线程,那么非核心线程就会被销毁,而核心现在会一直线程池中不会被销毁

当任务太多,非核心线程无法处理时,线程池的行为取决于其配置以及所使用的拒绝策略(由 RejectedExecutionHandler 参数指定)。以下是一些可能的情况:

  1. 任务队列满了:如果工作队列已满,而非核心线程数已达到最大线程数(即已创建了所有允许的线程),那么新任务可能会触发拒绝策略。

  2. 使用默认拒绝策略:如果你没有为 ThreadPoolExecutor 显式指定拒绝策略,那么默认的拒绝策略是抛出 RejectedExecutionException 异常。这意味着线程池不会接受新任务,并且会抛出异常,通知你无法处理更多任务。

    拒绝策略 jdk 提供了 4 种实现:

    • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
  3. 自定义拒绝策略:你可以通过传递一个自定义的拒绝策略给 RejectedExecutionHandler 参数来定义自己的处理方式。这个自定义策略可以根据你的需求执行不同的操作,例如将任务丢弃、将任务放入队列以等待重新执行、阻塞任务的提交者等。

  4. 任务提交者被阻塞:如果你使用了某些阻塞的拒绝策略,例如 BlockingQueue,当工作队列已满时,任务提交者可能会被阻塞,直到队列中有空间为止。这可以帮助控制任务的提交速率,以防止资源过度消耗。

注:

  1. 上图中的c,和m:corePoolSize=2,maximumPoolSize=3,非核心线程=最大线程数(m)-核心线程数(c)
  2. 非核心线程必须配合有界队列来使用,如果没有配合有界队列,那么所谓的非核心线程也不会被创建

JDK4种拒绝策略

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池,如下

1.1.3 newFixedThreadPool——固定大小线程池

Executors.newFixedThreadPool(int nThreads) 是一个工厂方法,用于创建一个固定大小的线程池。这个线程池中始终保持指定数量的核心线程,而且不会增加或减少,除非线程池被显式地关闭。下面是关于 newFixedThreadPool 工厂方法的详细解释:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  1. **参数 nThreads**:

    • 含义:这是线程池中核心线程的数量
    • 作用:线程池会始终保持这么多核心线程处于活动状态,即使它们没有正在执行的任务。这意味着你可以同时运行 nThreads 个线程来处理任务。

    可见,该方法将核心线程数和最大线程数设置为一致,即最大线程数=核心线程数,意味着没有非核心线程。

  2. 工作队列

    • 类型:newFixedThreadPool 使用了一个无界队列LinkedBlockingQueue)作为工作队列。
    • 作用:无界队列意味着线程池可以接受任意数量的任务,而不会抛出拒绝异常,因此工作队列大小没有限制。
  3. 最大线程数

    • newFixedThreadPool 的线程池的最大线程数始终等于核心线程数,因为它不会创建额外的非核心线程。
  4. 线程生命周期

    • 核心线程不会在空闲时被回收,因此线程池中的线程数量固定不变。**(因为没有非核心线程,自然也不需要超时时间)**
    • 线程在处理完一个任务后会立即寻找下一个任务执行,或者等待队列中有任务可供执行。
  5. 适用场景

    • 适合于需要固定数量线程的场景,例如需要限制并发度的情况。
    • 如果任务数量超过核心线程数,超过部分会被放入无界队列中等待执行。

    即适用于任务量已知,相对耗时的任务。

  6. 关闭线程池

    • 通常情况下,你需要手动调用线程池的 shutdown() 方法来关闭线程池。这会导致线程池进入 SHUTDOWN 状态,不再接受新任务,但会执行等待队列中的任务。
    • 如果希望立即停止线程池的运行并中断所有正在执行的任务,可以使用 shutdownNow() 方法。

    如果没有手动关闭此线程池,会导致程序一直阻塞着没有结束。

使用示例:

ExecutorService executor = Executors.newFixedThreadPool(5); // 创建一个固定大小为5的线程池
executor.execute(new MyTask()); // 提交任务给线程池执行
// ...
executor.shutdown(); // 关闭线程池

总之,newFixedThreadPool 工厂方法创建了一个具有固定数量核心线程的线程池,适用于需要控制并发度的场景,但需要注意线程池是无界的,因此可能在极端情况下导致内存耗尽。要合理选择线程池大小,以满足应用程序的需求。

1.1.4 newCachedThreadPool——带缓冲线程池

Executors.newCachedThreadPool() 是一个工厂方法,用于创建一个具有可变大小的线程池。这个线程池可以根据需要动态创建新线程,并在线程空闲一段时间后将其回收。以下是关于 newCachedThreadPool 工厂方法的详细解释:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
  1. 工作线程数量

    • newCachedThreadPool 创建的线程池不限制线程数量,可以根据需要动态创建线程。
    • 当有新任务需要执行时,如果当前没有空闲线程,线程池会创建一个新线程来执行任务。

    可见核心线程数为0,最大线程数为Integer的最大值,全部是非核心线程。

  2. 工作队列

    • newCachedThreadPool 不使用工作队列,因为它不限制线程数量,任务不需要排队等待执行。
    • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取工作任务是放不进去的(一手交钱、一手交货),后面示例解释。
  3. 线程生命周期

    • 线程在执行完一个任务后,如果在一定时间内没有新任务需要执行,那么这个线程会被终止并从线程池中移除。
    • 这意味着 newCachedThreadPool 中的线程数量会根据任务的负载动态调整,适应不同工作负载。

    可见非核心线程的生命周期只有60s,如果在这段时间内一直处于空闲的,那么60s过后就会被回收。

  4. 适用场景

    • 适用于任务数量不确定或波动较大的情况,可以自动根据需求创建线程,避免了维护固定数量的线程的开销。
    • 适合短时间的任务,因为线程池会及时回收不再需要的线程。
  5. 关闭线程池

    • 通常情况下,你需要手动调用线程池的 shutdown() 方法来关闭线程池。这会导致线程池进入 SHUTDOWN 状态,不再接受新任务,但会执行已经提交的任务。
    • 如果希望立即停止线程池的运行并中断所有正在执行的任务,可以使用 shutdownNow() 方法。

使用示例:

ExecutorService executor = Executors.newCachedThreadPool(); // 创建一个可变大小的线程池
executor.execute(new MyTask()); // 提交任务给线程池执行
// ...
executor.shutdown(); // 关闭线程池

总之,newCachedThreadPool 工厂方法创建了一个具有可变大小的线程池,适用于需要根据需求自动调整线程数量的场景,尤其适合于短时间的任务。要注意,由于线程数量不受限制,如果任务数量过多,可能会导致线程数大幅增加,因此需要谨慎使用,以避免资源消耗过多。

SynchronousQueue示例:

@Slf4j
public class ThreadPollTets {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> integers = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                log.debug("putting {} ", 1);
                integers.put(1);
                log.debug("{} putted...", 1);
                log.debug("putting...{} ", 2);
                integers.put(2);
                log.debug("{} putted...", 2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1").start();
        Thread.sleep(1000);
        new Thread(() -> {
            try {
                log.debug("taking {}", 1);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();
        Thread.sleep(1000);
        new Thread(() -> {
            try {
                log.debug("taking {}", 2);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t3").start();
    }
}

执行结果:

17:40:01.331 [t1] DEBUG com.example.javatest.ThreadPollTets - putting 1 
17:40:02.336 [t2] DEBUG com.example.javatest.ThreadPollTets - taking 1
17:40:02.336 [t1] DEBUG com.example.javatest.ThreadPollTets - 1 putted...
17:40:02.336 [t1] DEBUG com.example.javatest.ThreadPollTets - putting...2 
17:40:03.348 [t3] DEBUG com.example.javatest.ThreadPollTets - taking 2
17:40:03.348 [t1] DEBUG com.example.javatest.ThreadPollTets - 2 putted...

可见,当线程线程t1执行put的时候被阻塞住了,因为当前还没有线程来取,等了1秒之后,线程t2要来取了,线程t1才put进去。

这就是SynchronousQueue所展示出来的一手交钱,一手交货。

1.1.5 newSingleThreadExecutor——单线程线程池

Executors.newSingleThreadExecutor() 是一个工厂方法,用于创建一个单线程的线程池。这个线程池中只包含一个核心线程,它用于顺序执行提交的任务。以下是关于 newSingleThreadExecutor 工厂方法的详细解释:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  1. 工作线程数量

    • newSingleThreadExecutor 创建的线程池只包含一个核心线程。
    • 这意味着它会顺序执行提交的任务,一个接一个地执行,不会并发执行多个任务。
  2. 工作队列

    • newSingleThreadExecutor 不使用工作队列,因为它只有一个线程,不需要排队等待执行的任务。
  3. 线程生命周期

    • 核心线程在执行完一个任务后会保持活动状态,不会被回收,除非线程池被显式地关闭。
    • 线程池中的线程数量始终保持为1。
  4. 适用场景

    • 适用于需要保持任务执行顺序、依赖顺序执行的场景,或者需要一个单线程的执行环境,以避免并发问题
    • 可以用于定时执行任务,确保任务按照特定的顺序和频率执行。
  5. 关闭线程池

    • 通常情况下,你需要手动调用线程池的 shutdown() 方法来关闭线程池。这会导致线程池进入 SHUTDOWN 状态,不再接受新任务,但会执行已经提交的任务。
    • 如果希望立即停止线程池的运行并中断所有正在执行的任务,可以使用 shutdownNow() 方法。

使用示例:

ExecutorService executor = Executors.newSingleThreadExecutor(); // 创建一个单线程的线程池
executor.execute(new MyTask()); // 提交任务给线程池执行
// ...
executor.shutdown(); // 关闭线程池

总之,newSingleThreadExecutor 工厂方法创建了一个单线程的线程池,适用于需要顺序执行任务或避免并发问题的场景。它保证了任务按照提交的顺序执行,并且不会有多个线程并发执行任务。

使用该线程和自己创建一个线程的区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

    @Slf4j
    public class ThreadPollTets {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.execute(()->{
                log.debug("1");
                int i = 1/0;
            });
            executor.execute(()->{
                log.debug("2");
            });
            executor.execute(()->{
                log.debug("3");
            });
        }
    }
    

    执行结果:

    17:58:19.772 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 1
    17:58:19.773 [pool-1-thread-2] DEBUG com.example.javatest.ThreadPollTets - 2
    17:58:19.773 [pool-1-thread-2] DEBUG com.example.javatest.ThreadPollTets - 3
    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
        at com.example.javatest.ThreadPollTets.lambda$main$0(ThreadPollTets.java:21)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    可见,当执行到1/0的时候,线程会出现异常然后挂掉,如果是自己使用单线程来做,那么你还要重新创建一个,但是如果你是使用的单线程池,线程池会帮我们创建好线程,始终保证池种有一个线程在。

  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改

    • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。

      ThreadPoolExecutor是ExecutorService的的实现类。

  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改

    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

其他区别:

  1. 任务队列
    • 单线程池会维护一个任务队列,可以将多个任务提交到队列中,由单个线程逐个执行。
    • 自己创建一个线程执行任务时,你需要自己实现任务的排队和调度逻辑,这可能涉及到手动创建队列和线程间的通信。
  2. 线程生命周期管理
    • 单线程池会自动管理线程的生命周期,包括创建、启动、停止、回收线程。
    • 自己创建线程时,你需要手动管理线程的生命周期,包括创建、启动、停止和资源释放,这可能涉及到更多的细节和复杂性。
  3. 线程复用
    • 单线程池中的线程是可复用的,一旦一个任务执行完成,线程会继续执行队列中的下一个任务,而不会被销毁。
    • 自己创建的线程在任务执行完后如果不需要继续使用,需要手动销毁,否则会浪费资源。
  4. 错误处理
    • 单线程池内部会处理线程的异常,确保线程不会因为未捕获的异常而终止。
    • 自己创建的线程需要自行处理异常,否则未捕获的异常可能会导致线程崩溃。
  5. 可控性
    • 单线程池提供了更高的任务执行控制,例如可以通过关闭线程池来停止任务的执行。
    • 自己创建的线程可能难以在需要时进行有效的控制和管理。

1.1.6 newScheduledThreadPool——任务调度线程池

『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

例如如下例子:

@Slf4j
public class TimerTest {
    public static void main(String[] args) {
        Timer timer = new Timer();
        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task1");
                try {
                    Thread.sleep(1000);//休眠1S
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task2");
            }
        };
        timer.schedule(task1,1000);
        timer.schedule(task2,1000);
    }
}
11:02:25.095 [Timer-0] DEBUG com.example.javatest.TimerTest - task1
11:02:26.106 [Timer-0] DEBUG com.example.javatest.TimerTest - task2

可见最开始两个线程都是延时1s执行,当task1休眠1s之后,task2也会被迫等待,因为是串行执行,所以必须要等task1执行完毕之后,task2才会执行。

不仅如此,如果task1出现异常了,那么task2就不会被执行,修改task1:

TimerTask task1 = new TimerTask() {
    @Override
    public void run() {
        log.debug("task1");
        int i = 1/0; //手动制造异常
    }
};
11:04:03.166 [Timer-0] DEBUG com.example.javatest.TimerTest - task1
Exception in thread "Timer-0" java.lang.ArithmeticException: / by zero
    at com.example.javatest.TimerTest$1.run(TimerTest.java:22)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)

Executors.newScheduledThreadPool(int corePoolSize) 是一个工厂方法,用于创建一个具有固定核心线程数的调度线程池。这个线程池可以用于执行定时任务或周期性任务,允许你在未来的某个时间点执行任务或定期执行任务。以下是关于 newScheduledThreadPool 工厂方法的详细解释:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
  1. **参数 corePoolSize**:

    • 含义:这是线程池中的核心线程数,表示线程池中同时允许执行的任务数量。
    • 作用:核心线程用于执行任务,调度线程池会保持这么多线程一直处于活动状态,即使它们没有正在执行的任务。
  2. 工作队列

    • newScheduledThreadPool 不使用工作队列,因为它主要用于执行定时任务,而不是在任务之间维护队列。
  3. 线程生命周期

    • 核心线程在执行完一个任务后会保持活动状态,不会被回收,以便随时执行下一个任务。
    • 线程池中的线程数量始终保持为核心线程数。
  4. 任务调度

    • newScheduledThreadPool 提供了用于调度任务的方法,例如 schedule 用于执行一次性任务,scheduleAtFixedRatescheduleWithFixedDelay 用于执行周期性任务。
    • 你可以指定任务的延迟时间或周期,线程池会在指定的时间点或间隔后执行任务。
  5. 适用场景

    • 适用于需要执行定时任务或周期性任务的场景,例如定时备份、定时清理等。
    • 也可以用于需要延迟执行任务的场景,例如在一段时间后执行某项操作。
  6. 关闭线程池

    • 通常情况下,你需要手动调用线程池的 shutdown() 方法来关闭线程池。这会导致线程池进入 SHUTDOWN 状态,不再接受新任务,但会执行已经提交的任务。
    • 如果希望立即停止线程池的运行并中断所有正在执行的任务,可以使用 shutdownNow() 方法。

使用示例:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 创建一个具有两个核心线程的调度线程池
scheduler.schedule(new MyTask(), 1, TimeUnit.SECONDS); // 一秒后执行任务(延时执行)
scheduler.scheduleAtFixedRate(new MyTask(), 0, 5, TimeUnit.SECONDS); // 每5秒执行一次任务(定时执行)
// ...
scheduler.shutdown(); // 关闭线程池

延时示例:

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start....");
    pool.schedule(()->log.debug("延时1s之后开始执行"),1, TimeUnit.SECONDS);
}
11:24:18.947 [main] DEBUG com.example.javatest.TimerTest - start....
11:24:19.975 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行

可见在主线程启动了1s之后,延时任务才执行

定时示例:

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start....");
    pool.scheduleAtFixedRate(()->log.debug("延时1s之后开始执行"),1, 1,TimeUnit.SECONDS);
}

参数解释:

  • 参数一:表示要执行的任务
  • 参数二:表示初始的延时时间
  • 参数三:每隔多久执行一次任务
  • 参数四:时间单位
11:25:35.842 [main] DEBUG com.example.javatest.TimerTest - start....
11:25:36.877 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
11:25:37.863 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
11:25:38.862 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
11:25:39.865 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
....

可见当start执行了之后,延时任务并没有立即执行,这是因为参数2的作用,要延时1s之后再执行。之后可见,每个1s就执行任务,这是参数3的作用。

现在又一个问题,如果任务执行的时间要比执行的频率(参数3)的时间要长怎么办?

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start....");
    pool.scheduleAtFixedRate(()->{
        log.debug("延时1s之后开始执行");
        try {
            Thread.sleep(2000);//模拟任务要执行2s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },1, 1,TimeUnit.SECONDS);
}
11:29:04.540 [main] DEBUG com.example.javatest.TimerTest - start....
11:29:05.567 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
11:29:07.582 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
11:29:09.596 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
...

可见如果任务的时间大于间隔时间,并且是每2s执行一次任务,这并不符合我们的预期,我们是想要的是每1s执行一次。

这时候就要使用scheduleWithFixedDelay方法了:

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start....");
    pool.scheduleWithFixedDelay(()->{
        log.debug("延时1s之后开始执行");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },1, 1,TimeUnit.SECONDS);
}
11:32:13.550 [main] DEBUG com.example.javatest.TimerTest - start....
11:32:14.574 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
11:32:17.587 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
11:32:20.613 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 延时1s之后开始执行
...

可见,基本上是每隔3s执行一次,这又是怎么回事?其实你看scheduleWithFixedDelayscheduleAtFixedRate之间的区别就可以了。

scheduleAtFixedRatescheduleWithFixedDelay 是 Java 中用于调度周期性任务的两个不同方法,它们有一些重要的区别

  1. 执行时间的控制

    • scheduleAtFixedRate:它会按照固定的速率执行任务。即使前一个任务的执行时间超过了指定的间隔时间,它仍然会在预定的时间间隔后开始下一个任务。这意味着任务的执行可能会重叠
    • scheduleWithFixedDelay:它会等待前一个任务执行完成后,再等待指定的延迟时间,然后开始下一个任务。这样,每个任务之间的间隔是固定的,不会发生任务重叠。
  2. 延迟时间的含义

    • scheduleAtFixedRate:第三个参数表示任务的执行间隔时间,即任务开始执行到下一个任务开始执行的时间间隔。
    • scheduleWithFixedDelay:第三个参数表示任务的延迟时间,即任务执行完成后到下一个任务开始执行的时间间隔。
  3. 适用场景

    • scheduleAtFixedRate 适用于需要按照固定速率执行任务的情况,例如定时心跳任务或需要定时采样数据的任务。
    • scheduleWithFixedDelay 适用于需要确保任务执行完成后等待一段固定时间间隔后再次执行的情况,例如需要在每次任务执行后执行清理或者需要等待外部资源准备好的任务。
  4. 容错性

    • scheduleAtFixedRate 在任务执行时间超过间隔时间时,会快速连续执行多次任务,可能导致任务堆积,应用程序性能下降。
    • scheduleWithFixedDelay 在任务执行时间超过间隔时间时,会等待前一个任务执行完成后再执行下一个任务,可以避免任务堆积。

使用任务调度线程池来解决Timer流下的问题:

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    pool.schedule(()->log.debug("task1"),1, TimeUnit.SECONDS);
    pool.schedule(()->log.debug("task2"),1, TimeUnit.SECONDS);
}
11:11:11.970 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - task1
11:11:11.971 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - task2

可见由于线程池中的核心线程数只有1个,所以任务还是串行执行的,那么如果我将线程数改为2呢?

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
    pool.schedule(()->log.debug("task1"),1, TimeUnit.SECONDS);
    pool.schedule(()->log.debug("task2"),1, TimeUnit.SECONDS);
}
11:12:34.479 [pool-1-thread-2] DEBUG com.example.javatest.TimerTest - task2
11:12:34.479 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - task1

可见两个线程是同时执行的了。

因此我们可以改变线程池中的核心线程数来解决Timer遗留下来的串行执行的问题。

下面再来看看异常的问题:

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    pool.schedule(() -> {
        log.debug("task1");
        int i = 1 / 0;
    }, 1, TimeUnit.SECONDS);
    pool.schedule(() -> log.debug("task2"), 1, TimeUnit.SECONDS);
}
11:14:44.103 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - task1
11:14:44.104 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - task2

可见task1出现了异常,但是并没有导致task2的不可执行。

并且task1出现了异常之后,没有打印出异常信息,那么这个该怎么解决呢?查看本章的1.6节

1.1.7 newWorkStealingPool——窃取线程池

Executors.newWorkStealingPool(int parallelism) 是一个工厂方法,用于创建一个工作窃取线程池。工作窃取线程池是 Java 7 引入的一种线程池,它可以根据任务的负载自动平衡工作线程,以充分利用多核处理器的并行性。以下是关于 newWorkStealingPool 工厂方法的详细解释:

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
  1. **参数 parallelism**:

    • 含义:这是线程池中并行工作线程的数量
    • 作用:工作窃取线程池会创建指定数量的工作线程,每个线程都可以独立执行任务。
  2. 工作队列

    • 工作窃取线程池中的每个工作线程都有自己的工作队列,用于存储待执行的任务
    • 如果一个线程完成了自己的任务,它可以从其他线程的队列中“窃取”任务来执行,以保持工作线程的高效利用。
  3. 线程生命周期

    • 工作窃取线程池中的线程会一直运行,除非线程池被显式关闭。
    • 当线程完成自己的任务并尝试窃取任务时,如果没有可用任务,线程可能会进入等待状态,等待有任务可供窃取。
  4. 任务调度

    • 你可以将任务提交给工作窃取线程池,线程池会自动分配任务给空闲的工作线程或窃取任务执行。
    • 任务执行的顺序和分配是自动管理的,你无需手动指定线程执行哪个任务。
  5. 适用场景

    • 适用于需要高并发处理任务的场景,特别是在多核处理器上,因为它可以充分利用多核并行性。
    • 适用于任务负载不均衡的情况,因为工作线程可以相互窃取任务来平衡负载。
    • 适用于任务之间存在依赖关系的场景,因为线程池会自动管理任务的执行顺序
  6. 关闭线程池

    • 通常情况下,你需要手动调用线程池的 shutdown() 方法来关闭线程池。这会导致线程池进入 SHUTDOWN 状态,不再接受新任务,但会执行已经提交的任务。
    • 如果希望立即停止线程池的运行并中断所有正在执行的任务,可以使用 shutdownNow() 方法。

使用示例:

ExecutorService executor = Executors.newWorkStealingPool(); // 创建一个工作窃取线程池
executor.execute(new MyTask()); // 提交任务给线程池执行
// ...
executor.shutdown(); // 关闭线程池

总之,newWorkStealingPool 工厂方法创建了一个工作窃取线程池,适用于需要高并发和自动任务负载平衡的场景。工作窃取线程池可以根据任务负载自动平衡工作线程,从而提高了多核处理器的利用率。这是一个非常强大的线程池类型,适用于各种并行和多线程编程的需求。

1.2 提交任务

1.2.1 execute

execute() 方法是线程池接口 Executor 的一个核心方法,用于提交一个任务给线程池执行。不同的线程池实现(如 ThreadPoolExecutorScheduledExecutorService 等)都实现了这个方法,因此你可以在不同的线程池中使用它。以下是对 execute() 方法的详细解释:

  1. 方法签名

    void execute(Runnable command);
    
  2. **参数 command**:

    • command 是一个实现了 Runnable 接口的任务对象。Runnable 接口是一个具有 run() 方法的函数式接口,用于表示一个可以在单独线程中执行的任务。
  3. 作用

    • execute() 方法用于将一个任务提交给线程池执行。一旦任务被提交,线程池会负责分配一个空闲线程来执行任务,或者将任务放入工作队列等待执行,具体行为取决于线程池的实现和配置。
  4. 任务执行流程

    • 当你调用 execute() 提交任务时,线程池会按照其配置的规则来决定任务的执行方式,可能会有以下几种情况:
      • 如果核心线程数未达到上限,线程池会创建一个新的核心线程来执行任务。
      • 如果核心线程已经达到上限,任务会被放入工作队列等待执行。
      • 如果工作队列已满(对于有界队列),线程池可能会根据拒绝策略来处理任务。
  5. 线程池管理

    • 线程池负责管理线程的生命周期,包括创建、启动、停止和回收线程。这意味着你不需要手动创建和管理线程,线程池会自动处理这些细节。
    • 线程池会确保任务按照一定的规则和策略执行,例如可以控制线程的数量、任务的排队和调度等。
  6. 返回值

    • execute() 方法没有返回值,它是一个异步操作。如果需要等待任务执行完成并获取任务执行的结果,可以考虑使用 submit() 方法,该方法会返回一个 Future 对象,通过该对象可以获取任务的执行结果。

使用示例:

ExecutorService executor = Executors.newFixedThreadPool(5); // 创建一个固定大小为5的线程池
executor.execute(new MyTask()); // 提交任务给线程池执行
executor.execute(new AnotherTask()); // 提交另一个任务
// ...
executor.shutdown(); // 关闭线程池

总之,execute() 方法用于将任务提交给线程池执行,由线程池负责管理任务的执行。这使得多线程编程更加方便,可以避免手动管理线程的复杂性,同时充分利用计算资源,提高程序的性能。


1.2.2 submit

submit() 方法是线程池接口 ExecutorService 提供的一个用于提交任务的方法,它与 execute() 方法类似,但具有一些额外的功能。submit() 方法允许你提交一个任务给线程池,并可以获取任务执行的结果。以下是对 submit() 方法的详细解释:

  1. 方法签名

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    
    • submit(Callable<T> task):用于提交一个返回结果的任务,该任务由 Callable 接口表示,Callable 接口的 call() 方法可以返回一个结果。
    • submit(Runnable task, T result):用于提交一个无返回值的任务,但允许传递一个结果值,任务执行完成后可以通过 Future 获取传递的结果值。
    • submit(Runnable task):用于提交一个无返回值的任务,不传递结果值。
  2. 参数

    • task:一个实现了 Callable 接口或 Runnable 接口的任务对象,可以是一个带返回值的任务或无返回值的任务。
    • result:只用于第二个方法,表示任务的结果值。
  3. 返回值

    • submit() 方法返回一个 Future 对象,Future 是 Java 提供的一种用于异步获取任务执行结果的机制。通过 Future 对象,你可以:
      • 检查任务是否完成。
      • 获取任务执行的结果。
      • 取消任务的执行。
      • 等待任务完成并获取结果(使用 get() 方法时可能会阻塞当前线程)。
  4. 任务执行流程

    • submit() 方法将任务提交给线程池执行,线程池会按照其配置的规则执行任务。
    • 对于带返回值的任务(使用 Callable 接口),你可以通过 Future 对象获取任务执行的结果。
    • 对于无返回值的任务(使用 Runnable 接口),submit() 方法也会返回一个 Future,但 Futureget() 方法将始终返回 null
  5. 异常处理

    • 如果任务执行过程中发生异常,异常会被封装到 Future 对象中,你可以通过 Futureget() 方法来获取异常,或使用 try-catch 块处理异常。

使用示例:

@Slf4j
public class ThreadPollTets {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(5); // 创建一个固定大小为5的线程池

        Future<String> future = executor.submit(() -> {
            log.debug("1111");
            Thread.sleep(1000);
            return "ok";
        });
        try {
            log.debug("方法的返回结果为:{}",future.get());
        } catch (ExecutionException e) {
            //获取结果(get())可能会出现异常,这里手动处理一下
            e.printStackTrace();
        }
        executor.shutdown(); // 关闭线程池
    }
}

执行结果:

09:38:06.515 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 1111
09:38:07.518 [main] DEBUG com.example.javatest.ThreadPollTets - 方法的返回结果为:ok

可见,当future.get()没有获取到结果的时候,该线程会阻塞住,一直到获取到接过来才继续往下执行。


1.2.3 invokeAll

invokeAll() 方法是线程池接口 ExecutorService 提供的一个方法,它用于提交一组任务(通常表示为 Callable 对象的集合),并等待所有任务执行完成invokeAll() 方法返回一个 List<Future<T>>,其中包含每个任务的执行结果。以下是对 invokeAll() 方法的详细解释:

  1. 方法签名

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)throws InterruptedException;
    
    • invokeAll() 方法接受一个 Collection,其中包含了一组 Callable 对象,每个 Callable 对象表示一个待执行的任务。
    • 方法声明可能会抛出 InterruptedException 异常,因为在等待所有任务完成时,线程可能被中断。
  2. **参数tasks**:

    • tasks 是一个包含多个 Callable 任务的集合。这些任务会由线程池并行执行。
    • 有可能有超时时间,如果这段时间内都没有执行完,那么后续所有的任务都会被终止。
  3. 返回值

    • invokeAll() 方法返回一个 List<Future<T>>,其中每个 Future 对象表示一个任务的执行结果。你可以通过 Future 获取任务的结果,或者处理任务的异常。
    • invokeAll() 方法的返回值列表的顺序与传入任务的顺序相同,这意味着你可以通过索引来访问特定任务的结果。
  4. 等待任务完成

    • invokeAll() 方法会阻塞当前线程,直到所有任务都完成或者当前线程被中断。
    • 如果所有任务都成功执行完成,它会返回包含每个任务执行结果的 Future 列表。
    • 如果在等待期间线程被中断,invokeAll() 方法会抛出 InterruptedException 异常,并且可能会取消未完成的任务(取决于线程池的实现和配置)。
  5. 异常处理

    • 如果其中一个任务抛出了异常,invokeAll() 方法仍会等待其他任务完成,然后抛出一个 ExecutionException,其中包含第一个抛出异常的任务的异常信息。
    • 你可以通过遍历 Future 列表来检查每个任务的执行结果,如果发现异常,可以使用 Futureget() 方法获取异常。

使用示例:

@Slf4j
public class ThreadPollTets {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2); // 创建一个固定大小为5的线程池
        //创建3个Callable任务
        List<Future<Integer>> futureList = executor.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin_1");
                    Thread.sleep(1000);
                    return 1;
                },
                () -> {
                    log.debug("begin_2");
                    Thread.sleep(500);
                    return 2;
                },
                () -> {
                    log.debug("begin_3");
                    Thread.sleep(2000);
                    return 3;
                }
        ));
        //获取所有任务结果
        futureList.forEach(f->{
            try {
                System.out.println(f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        executor.shutdown(); // 关闭线程池
    }
}

执行:

09:52:16.079 [pool-1-thread-2] DEBUG com.example.javatest.ThreadPollTets - begin_2
09:52:16.079 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - begin_1
09:52:16.592 [pool-1-thread-2] DEBUG com.example.javatest.ThreadPollTets - begin_3
1
2
3

可见,有核心线程只有2个,所以begin_2、begin_1同时执行,begin_3就会被放到任务队列,当begin_2执行完毕的时候,就取出begin_3执行,而主线程中想要获取执行的结果,必须等待所有的线程执行完毕之后,才能拿到结果。


1.2.4 invokeAny

invokeAny() 方法是线程池接口 ExecutorService 提供的一个方法,它用于提交一组任务(通常表示为 Callable 对象的集合),并等待其中任何一个任务成功执行完成(即第一个任务完成或者返回结果)。invokeAny() 方法返回一个与成功完成的任务关联的结果,并且不等待其他任务的完成。以下是对 invokeAny() 方法的详细解释:

也就是说:提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

  1. 方法签名

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
         throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    
    • invokeAny() 方法接受一个 Collection,其中包含了一组 Callable 任务,每个 Callable 对象表示一个待执行的任务。
    • 方法声明可能会抛出 InterruptedExceptionExecutionException 异常。InterruptedException 表示等待任务完成的过程中线程被中断,ExecutionException 表示任务执行过程中抛出了异常。
  2. **参数 tasks**:

    • tasks 是一个包含多个 Callable 任务的集合。这些任务会由线程池并行执行。
    • 有可能有超时时间
  3. 返回值

    • invokeAny() 方法返回成功完成的任务的结果(任务返回的值),具体类型取决于任务的 Callable 类型。
    • 如果其中一个任务成功完成(即返回结果或不抛出异常),则 invokeAny() 方法会立即返回,并且不等待其他任务的完成。
    • 如果所有任务都抛出异常或被取消,invokeAny() 方法会抛出一个 ExecutionException 异常,其中包含导致第一个异常的任务的异常信息。
  4. 等待任务完成

    • invokeAny() 方法会阻塞当前线程,直到其中任何一个任务成功执行完成或者当前线程被中断。
    • 当某个任务成功完成后,invokeAny() 方法会立即返回,不等待其他任务的完成。
    • 如果在等待期间线程被中断,invokeAny() 方法会抛出 InterruptedException 异常,并且可能会取消未完成的任务(取决于线程池的实现和配置)。
  5. 异常处理

    • 如果其中一个任务抛出异常,invokeAny() 方法会立即返回,并且抛出一个 ExecutionException 异常,其中包含导致第一个异常的任务的异常信息。
    • 如果所有任务都抛出异常或被取消,invokeAny() 方法会抛出一个 ExecutionException 异常,其中包含导致第一个异常的任务的异常信息。

使用示例:

@Slf4j
public class ThreadPollTets {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2); // 创建一个固定大小为5的线程池

        int result = executor.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin_1");
                    Thread.sleep(1000);
                    return 1;
                },
                () -> {
                    log.debug("begin_2");
                    Thread.sleep(500);
                    return 2;
                },
                () -> {
                    log.debug("begin_3");
                    Thread.sleep(2000);
                    return 3;
                }
        ));
        System.out.println(result);
        executor.shutdown(); // 关闭线程池
    }
}

执行结果:

10:04:11.403 [pool-1-thread-2] DEBUG com.example.javatest.ThreadPollTets - begin_2
10:04:11.403 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - begin_1
2

可见,我只有2个和核心线程,两个任务同时启动,但是任务2先执行完毕,程序就结束了。


1.3 关闭线程池

关闭线程池是在使用完线程池后,确保线程池中的所有线程都被正确终止和释放资源的重要操作。Java 中的线程池通常使用 ExecutorService 接口来操作,以下是关闭线程池的几种常见方法:

  1. shutdown() 方法

    shutdown() 方法用于优雅地关闭线程池,它会等待所有已提交的任务执行完成,然后关闭线程池。线程池不再接受新的任务,但会继续执行已提交的任务。

    ExecutorService executor = Executors.newFixedThreadPool(5);
    // 提交任务
    executor.submit(() -> {
        // 任务执行代码
    });
    // 关闭线程池
    executor.shutdown();
    

    使用 shutdown() 方法后,线程池会进入 SHUTDOWN 状态,不再接受新任务,但会执行已提交的任务。

  2. shutdownNow() 方法

    shutdownNow() 方法用于立即关闭线程池,它会尝试停止所有正在执行的任务,并返回等待执行的任务列表。这个方法可能会中断正在执行的任务,因此需要谨慎使用。

    ExecutorService executor = Executors.newFixedThreadPool(5);
    // 提交任务
    executor.submit(() -> {
        // 任务执行代码
    });
    // 立即关闭线程池
    List<Runnable> waitingTasks = executor.shutdownNow(); //这里其实就是返回任务队列中的任务
    

    使用 shutdownNow() 方法后,线程池会进入 STOP 状态,不再接受新任务,尝试中断正在执行的任务,并返回等待执行的任务列表。

  3. awaitTermination() 方法

    awaitTermination(long timeout, TimeUnit unit) 方法用于等待线程池中的任务执行完成,直到达到指定的超时时间为止。如果在超时时间内所有任务都执行完成,返回 true,否则返回 false通常与 shutdown() 方法结合使用。

    ExecutorService executor = Executors.newFixedThreadPool(5);
    // 提交任务
    executor.submit(() -> {
        // 任务执行代码
    });
    // 关闭线程池并等待任务执行完成,最多等待10秒
    executor.shutdown();
    if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
        // 如果在超时时间内线程池中的任务还未执行完成,可以根据需要采取其他操作
    }
    

    awaitTermination() 方法可以确保线程池中的任务在指定的时间内完成执行。

  4. 使用 try-with-resources 关闭线程池

    如果你使用了 Java 7 或更高版本,可以使用 try-with-resources 结构来自动关闭线程池。这样可以确保在代码块执行完毕后自动关闭线程池,无需显式调用 shutdown()

    try (ExecutorService executor = Executors.newFixedThreadPool(5)) {
        // 提交任务
        executor.submit(() -> {
            // 任务执行代码
        });
    } // 在此处自动关闭线程池
    

    这种方式非常便捷,可以避免忘记手动关闭线程池而导致资源泄漏的问题。

1.4 异步模式之工作线程

1.4.1 定义

有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工。

1.4.2 饥饿

这里所说的饥饿与之前哲学家就餐问题所说的饥饿不是一回事哟。

这里的饥饿指的是线程不足。

固定大小线程池会有饥饿现象,用缓冲线程池则不会

  • 两个工人是同一个线程池中的两个线程,他们都是全能的,既可以做菜,也可以点餐
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
    • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜:没啥说的,做就是了
  • 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
  • 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
@Slf4j
public class ThreadPollTets {
    //菜单
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    //创建随机数
    static Random RANDOM = new Random();
    //随机点餐,返回的是0-MENU.size()之间的数
    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }
    public static void main(String[] args) {
        //有两个工人
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        //第一个工人执行execute里面的方法(点餐)
        //第二个工人执行submit里面的方法(做菜)
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

执行结果:

10:33:41.690 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 处理点餐...
10:33:41.692 [pool-1-thread-2] DEBUG com.example.javatest.ThreadPollTets - 做菜
10:33:41.692 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 上菜: 辣子鸡丁

可见,当只有一个客人来的时候,两个工人是完全能够处理的,一个点餐、一个做菜;

但是如果是来的两个客人:

@Slf4j
public class ThreadPollTets {
    //菜单
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    //创建随机数
    static Random RANDOM = new Random();
    //随机点餐,返回的是0-MENU.size()之间的数
    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }
    public static void main(String[] args) {
        //有两个工人
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        //第一个工人执行execute里面的方法(点餐)
        //第二个工人执行submit里面的方法(做菜)
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        //第二位客人
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

执行结果:

10:35:02.610 [pool-1-thread-2] DEBUG com.example.javatest.ThreadPollTets - 处理点餐...
10:35:02.610 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 处理点餐...

可见,当来了两个客人的时候,两个工人都去处理点餐了,就没人做菜了,这里就是我所说的饥饿。

注意,这里并不是死锁,死锁是两个线程互相等待对方的锁。

1.4.3 饥饿解决

这里你可以使用缓冲线程池,但是我这里就要用固定线程池,那应该怎么解决?

其实很简单,你让不同任务类型应该使用不同的线程池,这样就能够解决了。

public static void main(String[] args) {
    //有两个工人,分别属于不同线程池
    ExecutorService waiterPool = Executors.newFixedThreadPool(1);
    ExecutorService cookPool = Executors.newFixedThreadPool(1);
    //第一个工人执行execute里面的方法(点餐)
    //第二个工人执行submit里面的方法(做菜)
    waiterPool.execute(() -> {
        log.debug("处理点餐...");
        Future<String> f = cookPool.submit(() -> {
            log.debug("做菜");
            return cooking();
        });
        try {
            log.debug("上菜: {}", f.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
    waiterPool.execute(() -> {
        log.debug("处理点餐...");
        Future<String> f = cookPool.submit(() -> {
            log.debug("做菜");
            return cooking();
        });
        try {
            log.debug("上菜: {}", f.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
}

执行结果:

10:41:01.360 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 处理点餐...
10:41:01.361 [pool-2-thread-1] DEBUG com.example.javatest.ThreadPollTets - 做菜
10:41:01.362 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 上菜: 宫保鸡丁
10:41:01.362 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 处理点餐...
10:41:01.362 [pool-2-thread-1] DEBUG com.example.javatest.ThreadPollTets - 做菜
10:41:01.362 [pool-1-thread-1] DEBUG com.example.javatest.ThreadPollTets - 上菜: 地三鲜

可见,解决了之前的饥饿现象。

1.5 创建多少线程池合适

  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多内存

1.5.1 CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费.

1.5.2 I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下

线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式

4 * 100% * 100% / 50% = 8

例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式

4 * 100% * 100% / 10% = 40

1.6 正确处理线程池异常

使用execute

public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    pool.execute(() -> {
        log.debug("制造异常");
        int i = 1 / 0;
    });
    pool.shutdown();
}

执行结果:

14:52:17.167 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 制造异常
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    at com.example.javatest.TimerTest.lambda$main$0(TimerTest.java:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

使用submit

public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.submit(() -> { //使用submit
            log.debug("制造异常");
            int i = 1 / 0;
        });
        pool.shutdown();
    }

执行结果:

14:52:48.407 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 制造异常

Process finished with exit code 0

可见为什么使用execute出现了异常可以打印出来,而使用submit却无法打印异常呢?

这是因为 execute 方法执行的任务是一个 Runnable,而 Runnable 任务中抛出的异常会被传播到调用线程中。

而使用 submit 方法提交任务时,如果任务是一个 Runnable,则异常默认会被捕获并存储在 Future 对象中,你需要显式调用 Future.get() 方法来获取异常,否则异常不会被显示。

也就是使用submit返回的是一个Future,而异常被保存到了Future中去。

那么如何正确的来处理这些异常呢?

  1. 主动捉异常:也就是我们所说的try-catch块来处理

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(1);
        pool.submit(() -> { //使用submit
            log.debug("制造异常");
            try {
                int i = 1 / 0; //主动处理
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        pool.shutdown();
    }
    

    执行结果:

    15:02:07.738 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 制造异常
    java.lang.ArithmeticException: / by zero
        at com.example.javatest.TimerTest.lambda$main$0(TimerTest.java:22)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    可见使用try-catch的方式能够很好的处理。

  2. 使用上面所说的Future:当你调用Future.get()方法来获取结果的时候,如果中途出现了异常,也会打印出来的。

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(1);
        Future<Boolean> future = pool.submit(() -> { //使用submit
            log.debug("制造异常");
            int i = 1 / 0;
            return true;
        });
        try {
            log.debug("结果是:{}",future.get()); //这里get的时候有可能会出现异常,所以处理一下
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }
    

    执行结果:

    15:00:59.264 [pool-1-thread-1] DEBUG com.example.javatest.TimerTest - 制造异常
    java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at com.example.javatest.TimerTest.main(TimerTest.java:25)
    Caused by: java.lang.ArithmeticException: / by zero
        at com.example.javatest.TimerTest.lambda$main$0(TimerTest.java:21)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    可见这下异常能够打印出来了。

1.7 Fork/Join

Fork/Join 是一种并行计算的编程模型,用于解决大规模问题的分治并行处理。它是Java并发包中的一部分,通过 ForkJoinPool 类来实现。Fork/Join模型的核心思想是将一个大任务拆分成若干个小任务,并利用多线程并行执行这些小任务,最后将它们的结果合并起来。以下是Fork/Join模型的关键概念和使用方法:

  1. 任务拆分(Fork):在Fork/Join模型中,大任务被递归地分解成更小的子任务,直到子任务足够小而可以直接求解。这个过程称为任务拆分(Forking)。

  2. 任务合并(Join):每个子任务执行完后,它们的结果被合并成一个总的结果。这个过程称为任务合并(Joining)。

  3. 工作窃取(Work Stealing):Fork/Join框架引入了工作窃取算法,它允许空闲的线程从其他线程的任务队列中偷取任务来执行,以提高并行性和任务的负载均衡。

  4. ForkJoinPoolForkJoinPool 是Fork/Join模型的核心类,它负责管理任务的调度和线程的执行。你可以创建一个ForkJoinPool实例,并提交任务给它来执行。

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算。

Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

使用Fork/Join框架的一般步骤如下:

  1. 创建一个继承自 RecursiveTask(用于有返回值的任务)或 RecursiveAction(用于无返回值的任务)的任务类,重写 compute 方法,该方法定义了任务如何拆分和合并。

  2. 创建一个 ForkJoinPool 实例,通常可以使用 ForkJoinPool.commonPool() 获取一个默认的公共线程池,或者根据需要创建自定义的线程池。

  3. 将任务提交给 ForkJoinPool 执行,通常使用 invoke() 方法提交任务,或者使用 fork()join() 方法来手动拆分合并任务。

  4. 获取任务的结果(如果有返回值的话),并进行必要的处理。

下面是一个简单的示例,计算一个大数组中所有元素的总和:

package com.example.javatest;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

// 创建一个计算的求和的任务,因为要求和,就表示要有结果,所以使用 RecursiveTask
// 由于求和的结果为整数,所以泛型这里写 Integer
public class SumTask extends RecursiveTask<Integer> {
    // 阈值,用于判断是否继续拆分任务
    //每10个数之间做计算,将100/10=10,也就是划分为10组来计算
    private static final int THRESHOLD = 10; 
    private int[] array; // 存储待求和的数组
    private int start;   // 数组的起始索引
    private int end;     // 数组的结束索引

    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        // 如果待求和的数组长度小于等于阈值,表示当前数组元素中不足10个
        if (end - start <= THRESHOLD) { 
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i]; // 求和计算
            }
            return sum; // 返回计算结果
        } else { 
            // 如果待求和的数组长度大于阈值,需要拆分任务
            int middle = (start + end) / 2; // 计算数组中点

            // 创建左半部分任务,对左半部分数组求和
            SumTask leftTask = new SumTask(array, start, middle);
            // 创建右半部分任务,对右半部分数组求和
            SumTask rightTask = new SumTask(array, middle, end);

            //开启一个新线程来执行leftTask的任务
            leftTask.fork();
            // 同步执行右半部分任务(当前线程执行)
            int rightResult = rightTask.compute();
            // 等待左半部分任务的结果(合并)
            int leftResult = leftTask.join();

            // 合并左右两部分的结果并返回总和
            return leftResult + rightResult;
        }
    }

    public static void main(String[] args) {
        //计算0~99之和
        int[] array = new int[100];
        // 初始化数组
        for (int i = 0; i < array.length; i++) {
            array[i] = i;
        }
        ForkJoinPool pool = ForkJoinPool.commonPool(); // 创建一个ForkJoinPool实例
        SumTask task = new SumTask(array, 0, array.length); // 创建一个总求和任务
        int result = pool.invoke(task); // 提交任务并获取结果

        System.out.println("Sum: " + result); // 打印最终的求和结果
    }
}

执行结果:

Sum: 4950
Process finished with exit code 0

其实对于开发者来说,Fork/Join还是有点困难的,所以再JDK8出来之后,我们经常使用Stream流,Stream流内部其实就是用了Fork/Join


文章作者: 念心卓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 念心卓 !
  目录