首页 > Java > java教程 > 正文

Java ExecutorService:限制并发线程数量的实践指南

心靈之曲
发布: 2025-11-29 18:39:01
原创
381人浏览过

Java ExecutorService:限制并发线程数量的实践指南

本教程旨在详细阐述如何在java中利用`executorservice`框架,特别是`executors.newfixedthreadpool`方法,来精确控制并发执行的线程数量。文章将通过一个实际的文件序列化任务示例,指导读者如何定义可执行任务(`runnable`),配置固定大小的线程池,并实现任务的提交与服务的优雅关闭,确保多线程操作在预设的并发上限内高效、稳定地运行。

在现代Java应用开发中,多线程编程是提升程序性能和响应能力的关键技术。然而,不受控制的线程创建和执行可能导致系统资源耗尽、性能下降甚至程序崩溃。特别是在处理I/O密集型任务(如文件读写、网络请求)时,限制并发线程的数量至关重要。Java 5引入的java.util.concurrent包,尤其是Executors框架,为我们提供了强大而灵活的工具来管理线程池,从而实现对并发度的精确控制。

1. 理解并发控制的需求

假设我们有一个List<EventuelleDestination>对象列表,需要对列表中的每个元素执行一个序列化操作,将其写入到文件中。如果列表非常大,为每个元素都创建一个新线程会带来巨大的开销。更重要的是,过多的并发文件写入操作可能会导致磁盘I/O瓶颈或操作系统资源限制。因此,我们需要一种机制来限制同时运行的序列化线程数量,例如,只允许最多3个线程同时执行。

原始的序列化任务方法如下:

public void serializeDestinationEmploye(EventuelleDestination e) {
    Gson gson = new Gson();
    String filename = "/" + employeDao.getEmploye().getId() + "_" + entrepriseDao.retrouveEmplacementIdParDepartementId(e.getEventuelAcceuillant().getId()) + "_" + e.getEventuelAcceuillant().getId() + ".json";

    try (Writer writer = new FileWriter(dossierSoumissions.toString() + filename)) {
        gson.toJson(e, writer);
        System.out.println(e + " has been serialized...");
    } catch (IOException fileNotFoundException) {
        fileNotFoundException.printStackTrace();
    }
}
登录后复制

2. Java Executors 框架概览

Executors框架是Java中用于管理线程的强大工具集,它提供了一系列工厂方法来创建不同类型的ExecutorService。ExecutorService是管理和执行Runnable或Callable任务的核心接口。

立即学习Java免费学习笔记(深入)”;

要限制并发线程数量,最常用的方法是使用固定大小的线程池,这可以通过Executors.newFixedThreadPool(int nThreads)方法实现。该方法会创建一个线程池,其中包含固定数量的线程。当有新任务提交时,如果线程池中的所有线程都在忙碌,任务将被放入一个等待队列,直到有线程空闲。

3. 定义可执行任务:Runnable

在使用ExecutorService之前,我们需要将要执行的并发逻辑封装成一个任务。在Java中,任务通常通过实现Runnable接口或Callable接口来定义。对于不返回结果的简单任务,Runnable是更合适的选择。

神采PromeAI
神采PromeAI

将涂鸦和照片转化为插画,将线稿转化为完整的上色稿。

神采PromeAI 103
查看详情 神采PromeAI

我们将上述的serializeDestinationEmploye方法封装到一个实现Runnable接口的类中。为了演示,我们创建一个SerializationTask类:

package com.example.serialization;

import com.google.gson.Gson; // 假设您使用了Gson库
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.file.Path;
import java.time.Instant;

public class SerializationTask implements Runnable {
    private EventuelleDestination destination;
    private Path outputDirectory;
    // 假设 employeDao 和 entrepriseDao 是可用的,或者通过构造函数注入
    // 简化处理,这里直接使用模拟数据或假设它们已初始化
    private String employeId = "emp123"; 
    private String emplacementId = "loc456";

    public SerializationTask(EventuelleDestination destination, Path outputDirectory) {
        this.destination = destination;
        this.outputDirectory = outputDirectory;
    }

    @Override
    public void run() {
        // 模拟原始的序列化逻辑
        Gson gson = new Gson();
        // 实际应用中,employeDao和entrepriseDao应通过依赖注入或传递获取
        String filename = "/" + employeId + "_" + emplacementId + "_" + destination.getId() + ".json";

        try (Writer writer = new FileWriter(outputDirectory.resolve(filename).toString())) {
            gson.toJson(destination, writer);
            System.out.println(Thread.currentThread().getName() + " reporting: " + destination + " has been serialized at " + Instant.now());
        } catch (IOException e) {
            System.err.println("Error serializing " + destination + ": " + e.getMessage());
            e.printStackTrace();
        }
    }

    // 假设 EventuelleDestination 是一个简单的POJO,这里仅为示例提供一个骨架
    public static class EventuelleDestination {
        private String id;
        private String name;

        public EventuelleDestination(String id, String name) {
            this.id = id;
            this.name = name;
        }

        public String getId() { return id; }
        public String getName() { return name; }

        @Override
        public String toString() {
            return "EventuelleDestination{" + "id='" + id + '\'' + ", name='" + name + '\'' + '}';
        }
    }
}
登录后复制

注意:

  • EventuelleDestination、employeDao和entrepriseDao在实际项目中需要有具体的实现。这里为了示例简化了部分依赖。
  • 在run()方法中,我们添加了Thread.currentThread().getName()和Instant.now(),这有助于在日志中追踪哪个线程在何时执行了哪个任务,尤其是在调试并发问题时非常有用。

4. 使用 ExecutorService 实现并发限制

现在,我们来创建并管理ExecutorService,以限制并发线程数量为3。

package com.example.serialization;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class SerializationApp {

    public static void main(String[] args) {
        SerializationApp app = new SerializationApp();
        app.demoSerialization();
    }

    private void demoSerialization() {
        // 1. 准备数据和输出目录
        Path outputDir = Paths.get("serialized_data");
        try {
            Files.createDirectories(outputDir); // 确保输出目录存在
        } catch (IOException e) {
            System.err.println("Failed to create output directory: " + e.getMessage());
            return;
        }

        List<SerializationTask.EventuelleDestination> destinations = 
                IntStream.range(1, 20) // 假设有20个待序列化的目的地
                        .mapToObj(i -> new SerializationTask.EventuelleDestination("dest" + i, "Location " + i))
                        .toList();

        // 2. 创建任务列表
        List<Runnable> tasks = new ArrayList<>();
        for (SerializationTask.EventuelleDestination dest : destinations) {
            tasks.add(new SerializationTask(dest, outputDir));
        }

        // 3. 创建固定大小的线程池,限制并发数为3
        // Executors.newFixedThreadPool(3) 将创建一个维护3个线程的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3); 
        System.out.println("ExecutorService created with 3 fixed threads.");

        // 4. 提交所有任务到线程池
        tasks.forEach(executorService::submit);
        System.out.println("All tasks submitted. Waiting for termination...");

        // 5. 优雅地关闭 ExecutorService
        shutdownAndAwaitTermination(executorService);
        System.out.println("All tasks completed and ExecutorService shut down.");
    }

    /**
     * 优雅地关闭 ExecutorService,等待所有任务完成。
     * 这是一个来自 Javadoc 的标准模板,略有修改。
     *
     * @param executorService 要关闭的 ExecutorService
     */
    void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown(); // 禁用新任务的提交
        try {
            // 等待现有任务在指定时间内终止
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow(); // 取消当前正在执行的任务
                // 再次等待任务响应取消
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Executor service did not terminate. " + Instant.now());
                }
            }
        } catch (InterruptedException ex) {
            // (重新)取消,如果当前线程也被中断
            executorService.shutdownNow();
            // 保留中断状态
            Thread.currentThread().interrupt();
        }
    }
}
登录后复制

5. 运行结果示例

当运行上述SerializationApp时,你会看到类似以下的输出(具体顺序和时间戳会因系统而异):

ExecutorService created with 3 fixed threads.
All tasks submitted. Waiting for termination...
pool-1-thread-1 reporting: EventuelleDestination{id='dest1', name='Location 1'} has been serialized at 2023-10-27T08:00:01.123Z
pool-1-thread-2 reporting: EventuelleDestination{id='dest2', name='Location 2'} has been serialized at 2023-10-27T08:00:01.125Z
pool-1-thread-3 reporting: EventuelleDestination{id='dest3', name='Location 3'} has been serialized at 2023-10-27T08:00:01.128Z
pool-1-thread-1 reporting: EventuelleDestination{id='dest4', name='Location 4'} has been serialized at 2023-10-27T08:00:01.150Z
pool-1-thread-2 reporting: EventuelleDestination{id='dest5', name='Location 5'} has been serialized at 2023-10-27T08:00:01.152Z
pool-1-thread-3 reporting: EventuelleDestination{id='dest6', name='Location 6'} has been serialized at 2023-10-27T08:00:01.155Z
... (输出会继续,但你会发现同时执行的线程名只有 pool-1-thread-1, -2, -3)
All tasks completed and ExecutorService shut down.
登录后复制

从输出中可以清楚地看到,尽管我们提交了20个任务,但实际执行任务的线程始终是pool-1-thread-1、pool-1-thread-2和pool-1-thread-3这三个线程,这正是Executors.newFixedThreadPool(3)所实现的效果。

6. 注意事项

  • 输出顺序与执行顺序:System.out.println的输出并不总是按照任务完成的严格时间顺序出现。这是因为不同的线程可能会在不同的时间点将内容写入标准输出流,并且操作系统的调度和缓冲区机制会影响最终在控制台上的显示顺序。如果需要精确的时间戳,务必在日志中包含Instant.now()或其他时间信息。
  • 优雅关闭:shutdownAndAwaitTermination方法是关闭ExecutorService的最佳实践。shutdown()会阻止新任务的提交,但允许已提交的任务继续执行。awaitTermination()则会阻塞当前线程,直到所有任务完成或超时。如果超时,shutdownNow()会尝试中断所有正在执行的任务。正确地关闭线程池可以防止资源泄露和程序挂起。
  • Runnable vs. Callable:如果你的任务需要返回一个结果或者抛出检查异常,那么应该使用Callable接口而不是Runnable。Callable配合Future可以获取任务的执行结果或捕获异常。
  • 异常处理:在Runnable的run()方法中,任何未捕获的运行时异常都将导致执行该任务的线程终止。为了确保程序的健壮性,务必在任务内部捕获并处理可能发生的异常。

总结

通过Java Executors框架,特别是Executors.newFixedThreadPool()方法,我们可以轻松地实现对并发线程数量的精确控制。这不仅简化了多线程编程的复杂性,还有助于优化资源利用,提高应用程序的稳定性和性能。理解如何定义任务、创建线程池以及优雅地关闭服务,是每个Java开发者掌握并发编程的关键技能。

以上就是Java ExecutorService:限制并发线程数量的实践指南的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号