本文共 2564 字,大约阅读时间需要 8 分钟。
在处理大量异步任务时,传统的Phaser类虽然功能强大,但存在总数量受限(65535)的问题,这在面对不定任务量时往往显得力不从心。为了解决这一痛点,我决定设计并实现一个更具灵活性的自定义同步类——FunPhaser。
FunPhaser类的设计主要基于以下几个核心思想:
线程安全计数:通过AtomicInteger维护任务完成数量和未完成任务数量,确保操作的原子性。
灵活的注册和完成机制:允许程序动态地注册和完成任务,避免了Phaser类固定同步数量的限制。
丰富的状态管理:提供多种方法来查询和控制任务状态,满足不同场景的需求。
FunPhaser类由以下核心成员变量和方法组成:
AtomicInteger index:记录任务总数索引,用于标记任务完成状态。AtomicInteger taskNum:记录实际完成的任务数量。register()方法:用于注册新任务,每次调用会递增index值。done()方法:用于标记任务完成,递减index并递增taskNum。await()方法:等待所有任务完成,直到index变为0。queryTaskNum()方法:查询已完成的任务总数。以下是FunPhaser类的简要代码示例:
package com.funtester.frame;import java.util.concurrent.atomic.AtomicInteger;/**自定义同步类,避免Phaser的不足,总数量受限于65535 * 用于多线程任务同步,任务完成后,调用done()方法,任务总数减少, * 当任务总数为0时,调用await()方法,等待所有任务完成 */class FunPhaser { private final AtomicInteger index = new AtomicInteger(); private final AtomicInteger taskNum = new AtomicInteger(); FunPhaser() { this.index = new AtomicInteger(); this.taskNum = new AtomicInteger(); } void register() { index.getAndIncrement(); } void done() { index.getAndDecrement(); taskNum.getAndIncrement(); } void await() { while (index.get() != 0) { // 等待其他任务完成 } } int queryTaskNum() { return taskNum.get(); }} 以下是一个使用FunPhaser类的示例代码:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class FunPhaserDemo { public static void main(String[] args) throws InterruptedException { // 创建FunPhaser实例 FunPhaser phaser = new FunPhaser(); // 创建固定大小的线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 注册10个任务 for (int i = 0; i < 10; i++) { executorService.submit(new Runnable() { @Override public void run() { // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 标记任务完成 phaser.done(); } }); } // 等待所有任务完成 phaser.await(); // 输出已完成的任务数量 System.out.println("已完成的任务数量: " + phaser.queryTaskNum()); // 关闭线程池 executorService.shutdown(); }} 与传统的Phaser实现相比,FunPhaser具有以下优势:
| 指标 | 旧实现(Phaser) | 新实现(FunPhaser) |
|---|---|---|
| 使用到的类 | Phaser | FunPhaser |
| 总数量是否受限 | 是 | 否 |
| 代码简洁程度 | 较好 | 更好 |
通过自定义FunPhaser类,我们成功突破了Phaser类的数量限制,并通过简洁的API提供了更灵活的任务同步控制。这一实现不仅提升了代码的可读性和维护性,还为处理大量异步任务提供了更高效的解决方案。
转载地址:http://kqvfk.baihongyu.com/