并发编程

Java 并发编程核心模式

线程池、锁机制与并发容器实战

高级 40 分钟 2026-05-21

涉及技术

Java 17 JUC CompletableFuture Fork/Join

文档简介

系统梳理 Java 并发编程知识体系:从 synchronized/volatile 底层实现到 JUC 工具类使用,重点讲解线程池参数调优、CompletableFuture 异步编排及 Fork/Join 框架。

正文内容

并发编程的核心挑战

并发编程的难点在于正确性、可见性和有序性。Java 内存模型(JMM)定义了线程和主内存之间的抽象关系,synchronized 和 volatile 是保障并发安全的两个基础关键字。

volatile:轻量级同步

volatile 保证变量的可见性和有序性,但不保证原子性。适合作为状态标记或单次写多次读的场景:

public class VolatileExample {
    private volatile boolean running = true;

    public void shutdown() {
        running = false;  // 所有线程立即可见
    }

    public void doWork() {
        while (running) {  // 不会读取到本地缓存的旧值
            // 执行任务
        }
    }
}

// volatile 的双重检查锁定(DCL)
public class Singleton {
    private volatile static Singleton instance;

    public static Singleton getInstance() {
        if (instance == null) {              // 第一次检查(无锁)
            synchronized (Singleton.class) {
                if (instance == null) {        // 第二次检查(有锁)
                    instance = new Singleton(); // volatile 禁止指令重排序
                }
            }
        }
        return instance;
    }
}

synchronized 与锁升级

Java 6 之后,synchronized 不再是重量级锁的代名词。JVM 引入了锁升级机制:无锁 → 偏向锁 → 轻量级锁 → 重量级锁:

// 锁对象头的 Mark Word 结构(64 位 JVM)
// 无锁:    [hashcode:25] | [age:4] | [biased_lock:0] | [lock:01]
// 偏向锁:  [thread:54]   | [epoch:2] | [age:4] | [biased_lock:1] | [lock:01]
// 轻量锁:  [ptr_to_lock_record:62] | [lock:00]
// 重量锁:  [ptr_to_monitor:62]     | [lock:10]

// 生产环境中,可通过以下参数观察锁状态
-XX:+PrintBiasedLockingStatistics  # 打印偏向锁统计
-XX:BiasedLockingStartupDelay=0   # 启动即开启偏向锁(默认 4 秒延迟)

ReentrantLock:更灵活的锁

相比 synchronized,ReentrantLock 提供了公平锁、可中断、超时获取等高级特性:

public class ReentrantLockDemo {
    private final ReentrantLock lock = new ReentrantLock(true);  // 公平锁
    private final Condition condition = lock.newCondition();
    private int count = 0;

    public void increment() {
        lock.lock();
        try {
            count++;
            if (count >= 10) {
                condition.signalAll();  // 唤醒等待的线程
            }
        } finally {
            lock.unlock();  // 必须在 finally 中释放
        }
    }

    public boolean tryIncrement(long timeout, TimeUnit unit) {
        try {
            if (lock.tryLock(timeout, unit)) {  // 带超时的获取
                try {
                    count++;
                    return true;
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
}

线程池:并发的基础设施

线程池是管理线程生命周期的最佳实践。核心参数的选择直接影响系统稳定性:

// 线程池参数设计公式
// corePoolSize = CPU 核数 + 1(计算密集型)
// corePoolSize = CPU 核数 * 2(IO 密集型)
// maximumPoolSize = corePoolSize + (核心数 / (1 - 阻塞系数))
// workQueue = 有界队列(ArrayBlockingQueue),避免无限堆积
// keepAliveTime = 60s(非核心线程存活时间)
// rejectionPolicy = CallerRunsPolicy(让提交者自己执行,起到背压效果)

@Component
public class ThreadPoolConfig {

    @Bean("ioExecutor")
    public ThreadPoolExecutor ioExecutor() {
        int core = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            core * 2,                       // 核心线程数
            core * 4,                       // 最大线程数
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000), // 有界队列
            new ThreadFactoryBuilder()
                .setNameFormat("io-pool-%d")
                .build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    @Bean("computeExecutor")
    public ThreadPoolExecutor computeExecutor() {
        int core = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            core + 1,
            core + 1,
            0L, TimeUnit.MILLISECONDS,       // 计算密集型线程不回收
            new LinkedBlockingQueue<>(100),
            new ThreadFactoryBuilder()
                .setNameFormat("compute-pool-%d")
                .build(),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }
}

CompletableFuture:异步编排

CompletableFuture 是 Java 8 引入的异步编程工具,支持链式调用、组合多个异步任务、异常处理等:

@Service
public class OrderService {

    @Autowired
    @Qualifier("ioExecutor")
    private ExecutorService executor;

    public CompletableFuture<OrderDetail> getOrderDetail(Long orderId) {
        // 并行查询:订单、用户、商品、物流
        CompletableFuture<Order> orderFuture = CompletableFuture
            .supplyAsync(() -> orderMapper.selectById(orderId), executor);

        CompletableFuture<User> userFuture = orderFuture
            .thenCompose(order -> CompletableFuture
                .supplyAsync(() -> userService.getById(order.getUserId()), executor));

        CompletableFuture<List<Product>> productFuture = orderFuture
            .thenCompose(order -> CompletableFuture
                .supplyAsync(() -> productService.getByIds(order.getProductIds()), executor));

        CompletableFuture<Logistics> logisticsFuture = orderFuture
            .thenCompose(order -> CompletableFuture
                .supplyAsync(() -> logisticsService.getByOrderId(orderId), executor)
                .exceptionally(ex -> {
                    log.warn("Logistics query failed: {}", ex.getMessage());
                    return Logistics.EMPTY;  // 降级处理
                }));

        // 等待所有任务完成并组装结果
        return CompletableFuture.allOf(userFuture, productFuture, logisticsFuture)
            .thenApply(v -> {
                Order order = orderFuture.join();
                return OrderDetail.builder()
                    .order(order)
                    .user(userFuture.join())
                    .products(productFuture.join())
                    .logistics(logisticsFuture.join())
                    .build();
            });
    }
}

并发集合:线程安全的数据结构

JUC 包提供了多种线程安全的集合实现,选择合适的并发集合可以大幅提升性能:

// ConcurrentHashMap:分段锁 → CAS + synchronized(Java 8+)
ConcurrentHashMap<String, User> map = new ConcurrentHashMap<>();
// computeIfAbsent 是原子操作,无需额外同步
User user = map.computeIfAbsent(userId, id -> loadFromDatabase(id));

// CopyOnWriteArrayList:读多写少场景(如配置列表)
CopyOnWriteArrayList<String> configList = new CopyOnWriteArrayList<>();
// 读操作无锁,写操作复制整个数组(适合读多写少)

// LinkedBlockingQueue:生产者-消费者模式
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(1000);
// 生产者
queue.put(task);  // 队列满时阻塞
// 消费者
Task task = queue.take();  // 队列空时阻塞

// LongAdder:高并发计数(比 AtomicLong 性能更优)
LongAdder counter = new LongAdder();
counter.increment();  // 分散到多个 Cell 减少竞争
long sum = counter.sum();  // 读取时汇总

Fork/Join 框架:分治并行

Fork/Join 框架适合可分解的递归任务,利用工作窃取算法平衡线程负载:

// 并行计算大数据数组的和
public class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private final int[] array;
    private final int start;
    private final int end;

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 直接计算
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }

        // 拆分为两个子任务
        int mid = (start + end) / 2;
        SumTask left = new SumTask(array, start, mid);
        SumTask right = new SumTask(array, mid, end);

        left.fork();   // 异步执行左任务
        long rightResult = right.compute();  // 同步执行右任务
        long leftResult = left.join();  // 等待左任务结果

        return leftResult + rightResult;
    }
}

// 使用方式
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new SumTask(array, 0, array.length));