线程池、锁机制与并发容器实战
系统梳理 Java 并发编程知识体系:从 synchronized/volatile 底层实现到 JUC 工具类使用,重点讲解线程池参数调优、CompletableFuture 异步编排及 Fork/Join 框架。
并发编程的难点在于正确性、可见性和有序性。Java 内存模型(JMM)定义了线程和主内存之间的抽象关系,synchronized 和 volatile 是保障并发安全的两个基础关键字。
volatile 保证变量的可见性和有序性,但不保证原子性。适合作为状态标记或单次写多次读的场景:
public class VolatileExample {
private volatile boolean running = true;
public void shutdown() {
running = false; // 所有线程立即可见
}
public void doWork() {
while (running) { // 不会读取到本地缓存的旧值
// 执行任务
}
}
}
// volatile 的双重检查锁定(DCL)
public class Singleton {
private volatile static Singleton instance;
public static Singleton getInstance() {
if (instance == null) { // 第一次检查(无锁)
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查(有锁)
instance = new Singleton(); // volatile 禁止指令重排序
}
}
}
return instance;
}
}
Java 6 之后,synchronized 不再是重量级锁的代名词。JVM 引入了锁升级机制:无锁 → 偏向锁 → 轻量级锁 → 重量级锁:
// 锁对象头的 Mark Word 结构(64 位 JVM)
// 无锁: [hashcode:25] | [age:4] | [biased_lock:0] | [lock:01]
// 偏向锁: [thread:54] | [epoch:2] | [age:4] | [biased_lock:1] | [lock:01]
// 轻量锁: [ptr_to_lock_record:62] | [lock:00]
// 重量锁: [ptr_to_monitor:62] | [lock:10]
// 生产环境中,可通过以下参数观察锁状态
-XX:+PrintBiasedLockingStatistics # 打印偏向锁统计
-XX:BiasedLockingStartupDelay=0 # 启动即开启偏向锁(默认 4 秒延迟)
相比 synchronized,ReentrantLock 提供了公平锁、可中断、超时获取等高级特性:
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
private final Condition condition = lock.newCondition();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
if (count >= 10) {
condition.signalAll(); // 唤醒等待的线程
}
} finally {
lock.unlock(); // 必须在 finally 中释放
}
}
public boolean tryIncrement(long timeout, TimeUnit unit) {
try {
if (lock.tryLock(timeout, unit)) { // 带超时的获取
try {
count++;
return true;
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
}
线程池是管理线程生命周期的最佳实践。核心参数的选择直接影响系统稳定性:
// 线程池参数设计公式
// corePoolSize = CPU 核数 + 1(计算密集型)
// corePoolSize = CPU 核数 * 2(IO 密集型)
// maximumPoolSize = corePoolSize + (核心数 / (1 - 阻塞系数))
// workQueue = 有界队列(ArrayBlockingQueue),避免无限堆积
// keepAliveTime = 60s(非核心线程存活时间)
// rejectionPolicy = CallerRunsPolicy(让提交者自己执行,起到背压效果)
@Component
public class ThreadPoolConfig {
@Bean("ioExecutor")
public ThreadPoolExecutor ioExecutor() {
int core = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
core * 2, // 核心线程数
core * 4, // 最大线程数
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadFactoryBuilder()
.setNameFormat("io-pool-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
@Bean("computeExecutor")
public ThreadPoolExecutor computeExecutor() {
int core = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
core + 1,
core + 1,
0L, TimeUnit.MILLISECONDS, // 计算密集型线程不回收
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder()
.setNameFormat("compute-pool-%d")
.build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
CompletableFuture 是 Java 8 引入的异步编程工具,支持链式调用、组合多个异步任务、异常处理等:
@Service
public class OrderService {
@Autowired
@Qualifier("ioExecutor")
private ExecutorService executor;
public CompletableFuture<OrderDetail> getOrderDetail(Long orderId) {
// 并行查询:订单、用户、商品、物流
CompletableFuture<Order> orderFuture = CompletableFuture
.supplyAsync(() -> orderMapper.selectById(orderId), executor);
CompletableFuture<User> userFuture = orderFuture
.thenCompose(order -> CompletableFuture
.supplyAsync(() -> userService.getById(order.getUserId()), executor));
CompletableFuture<List<Product>> productFuture = orderFuture
.thenCompose(order -> CompletableFuture
.supplyAsync(() -> productService.getByIds(order.getProductIds()), executor));
CompletableFuture<Logistics> logisticsFuture = orderFuture
.thenCompose(order -> CompletableFuture
.supplyAsync(() -> logisticsService.getByOrderId(orderId), executor)
.exceptionally(ex -> {
log.warn("Logistics query failed: {}", ex.getMessage());
return Logistics.EMPTY; // 降级处理
}));
// 等待所有任务完成并组装结果
return CompletableFuture.allOf(userFuture, productFuture, logisticsFuture)
.thenApply(v -> {
Order order = orderFuture.join();
return OrderDetail.builder()
.order(order)
.user(userFuture.join())
.products(productFuture.join())
.logistics(logisticsFuture.join())
.build();
});
}
}
JUC 包提供了多种线程安全的集合实现,选择合适的并发集合可以大幅提升性能:
// ConcurrentHashMap:分段锁 → CAS + synchronized(Java 8+)
ConcurrentHashMap<String, User> map = new ConcurrentHashMap<>();
// computeIfAbsent 是原子操作,无需额外同步
User user = map.computeIfAbsent(userId, id -> loadFromDatabase(id));
// CopyOnWriteArrayList:读多写少场景(如配置列表)
CopyOnWriteArrayList<String> configList = new CopyOnWriteArrayList<>();
// 读操作无锁,写操作复制整个数组(适合读多写少)
// LinkedBlockingQueue:生产者-消费者模式
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(1000);
// 生产者
queue.put(task); // 队列满时阻塞
// 消费者
Task task = queue.take(); // 队列空时阻塞
// LongAdder:高并发计数(比 AtomicLong 性能更优)
LongAdder counter = new LongAdder();
counter.increment(); // 分散到多个 Cell 减少竞争
long sum = counter.sum(); // 读取时汇总
Fork/Join 框架适合可分解的递归任务,利用工作窃取算法平衡线程负载:
// 并行计算大数据数组的和
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private final int[] array;
private final int start;
private final int end;
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 拆分为两个子任务
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行左任务
long rightResult = right.compute(); // 同步执行右任务
long leftResult = left.join(); // 等待左任务结果
return leftResult + rightResult;
}
}
// 使用方式
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new SumTask(array, 0, array.length));