0

0

Java NIO非阻塞读写操作优化与常见陷阱

花韻仙語

花韻仙語

发布时间:2025-11-30 18:07:02

|

594人浏览过

|

来源于php中文网

原创

Java NIO非阻塞读写操作优化与常见陷阱

本文深入探讨java nio非阻塞读写操作中常见的“写操作阻塞”问题,分析了不当的`selectionkey`管理(如错误地使用`key.cancel()`和持续注册`op_write`)如何导致服务器在重复连接时陷入僵局。文章提供了优化后的代码示例,强调了动态调整`selectionkey`兴趣集的重要性,并强烈建议在生产环境中使用netty等成熟的nio框架以规避原生nio的复杂性。

Java NIO非阻塞I/O操作的挑战与优化

Java NIO(New I/O)提供了一种基于事件驱动、非阻塞I/O模型,它通过选择器(Selector)和通道(Channel)实现高效的网络通信。然而,原生NIO的编程模型较为复杂,尤其是在处理连接状态、读写事件以及SelectionKey的生命周期管理时,极易引入难以发现的错误,导致服务器在特定场景下表现异常,例如本例中服务器在处理第二个客户端连接时卡死在写操作环节。

问题分析:NIO服务器在写操作中卡死的根源

原始NNIO服务器代码在处理客户端连接时,在isAcceptable()事件中将新接受的SocketChannel注册到选择器,并同时关注SelectionKey.OP_READ和SelectionKey.OP_WRITE事件:

socketChannel.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE);

这种做法本身就存在潜在问题。OP_WRITE事件表示通道何时可以写入数据,只要发送缓冲区有空间,OP_WRITE事件就会持续触发。如果服务器没有数据需要写入,但OP_WRITE一直被关注,那么选择器将不断报告该事件,导致CPU空转或陷入写事件的无限循环。

更严重的问题出现在isWritable()的处理逻辑中:

立即学习Java免费学习笔记(深入)”;

if (key.isWritable()) {
    // ...
    Runnable h = new MyAsyncWriteThread(task);
    pool.execute(h);
    key.cancel(); // 致命错误
}

在完成写操作(或计划执行写操作)后,代码直接调用了key.cancel()。key.cancel()的作用是将该SelectionKey从其关联的选择器中移除,这意味着该通道将不再接收任何事件通知(包括后续的读事件),并且该通道实际上被“遗弃”了。当第一个客户端连接完成后,其SelectionKey被取消;当第二个客户端尝试连接时,其SocketChannel可能因为前一个被取消的键导致的状态混乱,或者其自身的键在完成写操作后也被取消,从而无法继续处理读写事件,最终表现为服务器卡死。

Joker AIx
Joker AIx

一站式AI创意生产平台,覆盖图像、视频、音频、文案全品类创作

下载

此外,使用Map<Integer, States> socketStates来管理每个SocketChannel的内部状态(Idle, Read, Write)虽然是一种尝试,但在与SelectionKey的事件驱动模型结合时,如果状态转换和SelectionKey的兴趣集(interest set)管理不匹配,同样容易导致逻辑错误和状态混乱。

优化方案:正确的SelectionKey管理与事件流控制

解决上述问题的关键在于精确地管理SelectionKey的兴趣集,确保只关注当前需要处理的事件,并在事件处理完成后,根据业务逻辑更新兴趣集。同时,key.cancel()应仅在通道即将关闭时使用。

以下是基于原始问题和答案进行优化的NIO服务器代码示例,它演示了如何正确地管理SelectionKey的兴趣集:

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; // 引入TimeUnit

public class MyAsyncProcessor {

    // 枚举表示连接的内部状态,用于更复杂的业务逻辑
    enum States {
        Idle, // 初始或等待读
        Reading, // 正在读
        ReadComplete, // 读完成,准备写
        Writing, // 正在写
        WriteComplete // 写完成
    }

    // 假设MyTask包含业务逻辑,并在线程池中执行
    public static class MyTask implements Runnable {
        private int secondsToRead;
        private int secondsToWrite;
        private SocketChannel clientChannel; // 添加通道引用以便回调

        public MyTask(SocketChannel channel) {
            this.clientChannel = channel;
        }

        public void setTimeToRead(int secondsToRead) {
            this.secondsToRead = secondsToRead;
        }

        public void setTimeToWrite(int secondsToWrite) {
            this.secondsToWrite = secondsToWrite;
        }

        @Override
        public void run() {
            System.out.println("Executing task for channel: " + clientChannel.hashCode() +
                    ", read delay: " + secondsToRead + "s, write delay: " + secondsToWrite + "s");
            try {
                // 模拟读操作耗时
                TimeUnit.SECONDS.sleep(secondsToRead);
                System.out.println("Read task completed for channel: " + clientChannel.hashCode());

                // 模拟写操作耗时
                TimeUnit.SECONDS.sleep(secondsToWrite);
                System.out.println("Write task completed for channel: " + clientChannel.hashCode());

                // 任务完成后,可以考虑将结果写入通道,或者重新注册OP_READ等待下一个请求
                // 这里为了演示,假设任务执行完毕后,可以通知选择器重新关注写事件
                // 注意:在实际NIO中,任务完成后的回调需要线程安全地操作Selector
                // 简单的做法是,任务完成后,将结果放入一个队列,主线程在select循环中检查队列并写入
                // 或者,如本例,直接在任务中写入并重新注册OP_READ (需要确保线程安全)
                // 考虑到NIO的单线程模型,通常不建议在工作线程直接操作SelectionKey
                // 这里仅作示例,实际应通过队列或wakeup()机制通知主线程

                // 示例:任务完成,准备写入响应(如果需要)
                // 实际中,这里应该将数据准备好,然后由主线程在下一次select循环中处理OP_WRITE
                // 为简化示例,这里不直接写入,而是假设任务完成,可以触发后续写操作
                // 或者,如果任务执行结果需要立即发送,可以:
                // clientChannel.write(ByteBuffer.wrap("Task finished.".getBytes(StandardCharsets.UTF_8)));
                // 之后,如果客户端需要继续发送数据,可以重新注册OP_READ
                // if (clientChannel.isOpen()) {
                //    clientChannel.register(clientChannel.selector(), SelectionKey.OP_READ);
                // }

            } catch (InterruptedException | IOException e) {
                System.err.println("Task execution error for channel " + clientChannel.hashCode() + ": " + e.getMessage());
                try {
                    clientChannel.close();
                } catch (IOException ioException) {
                    // ignore
                }
            }
        }
    }

    private ExecutorService pool;
    // 使用Map来存储每个SocketChannel的当前状态,以及其对应的业务数据(如MyTask)
    private Map<SocketChannel, ConnectionState> connectionStates = new HashMap<>();

    // 内部类,封装连接的状态和相关数据
    static class ConnectionState {
        States currentState = States.Idle;
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        MyTask task; // 存储与此连接相关的任务数据

        public ConnectionState(SocketChannel channel) {
            this.task = new MyTask(channel); // 每个连接有自己的任务实例
        }
    }

    public MyAsyncProcessor() {
    }

    public static void main(String[] args) throws IOException {
        new MyAsyncProcessor().process();
    }

    public void process() throws IOException {
        // 线程池用于执行耗时业务逻辑,避免阻塞NIO主线程
        pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

        InetAddress host = InetAddress.getByName("localhost");
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(host, 9876));

        // 注册ServerSocketChannel,只关注OP_ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started on port 9876.");

        while (true) {
            // 阻塞等待事件发生
            if (selector.select() > 0) {
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();

                while (i.hasNext()) {
                    SelectionKey key = i.next();
                    i.remove(); // 每次处理完一个事件后,必须将其从selectedKeys中移除

                    // 检查键是否仍然有效,防止处理已取消或已关闭的通道
                    if (!key.isValid()) {
                        continue;
                    }

                    try {
                        if (key.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) key.channel();
                            SocketChannel socketChannel = server.accept();
                            socketChannel.configureBlocking(false);
                            System.out.println("Connection accepted from: " + socketChannel.getRemoteAddress());

                            // 新连接注册到选择器,只关注OP_READ事件
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            connectionStates.put(socketChannel, new ConnectionState(socketChannel));

                        } else if (key.isReadable()) {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            ConnectionState state = connectionStates.get(socketChannel);
                            if (state == null) { // 可能连接已关闭,但事件仍在队列中
                                socketChannel.close();
                                key.cancel();
                                continue;
                            }

                            state.readBuffer.clear(); // 清空缓冲区以备读取
                            int bytesRead = socketChannel.read(state.readBuffer);

                            if (bytesRead > 0) {
                                state.readBuffer.flip(); // 切换到读模式
                                String clientMessage = StandardCharsets.UTF_8.decode(state.readBuffer).toString().trim();
                                System.out.println("Received from " + socketChannel.getRemoteAddress() + ": " + clientMessage);

                                // 解析消息,更新任务数据
                                String[] words = clientMessage.split(" ");
                                if (words.length >= 2) {
                                    int secondsToRead = Integer.parseInt(words[words.length - 2]);
                                    int secondsToWrite = Integer.parseInt(words[words.length - 1]);
                                    state.task.setTimeToRead(secondsToRead);
                                    state.task.setTimeToWrite(secondsToWrite);
                                } else {
                                    System.out.println("Invalid message format, using default task times.");
                                    state.task.setTimeToRead(1);
                                    state.task.setTimeToWrite(1);
                                }

                                // 将耗时任务提交到线程池
                                pool.execute(state.task);
                                state.currentState = States.Reading; // 标记为正在处理任务

                                // 读完数据后,取消OP_READ,注册OP_WRITE,准备发送响应
                                // 注意:这里假设业务逻辑处理完成后需要立即发送响应
                                key.interestOps(SelectionKey.OP_WRITE); // 仅关注写事件
                                state.currentState = States.ReadComplete; // 状态更新

                            } else if (bytesRead == -1) {
                                // 客户端关闭连接
                                System.out.println("Client " + socketChannel.getRemoteAddress() + " disconnected.");
                                closeConnection(socketChannel, key);
                            } else {
                                // bytesRead == 0, 暂时没有数据可读,等待下次事件
                                System.out.println("No data to read from " + socketChannel.getRemoteAddress());
                            }

                        } else if (key.isWritable()) {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            ConnectionState state = connectionStates.get(socketChannel);
                            if (state == null) {
                                socketChannel.close();
                                key.cancel();
                                continue;
                            }

                            // 只有当业务逻辑处理完成,并且有数据需要写入时才真正执行写入
                            if (state.currentState == States.ReadComplete) { // 确认数据已准备好
                                String response = "Server received and processed: " + state.task.secondsToRead + " " + state.task.secondsToWrite + "\n";
                                ByteBuffer writeBuffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
                                int bytesWritten = socketChannel.write(writeBuffer);

                                if (!writeBuffer.hasRemaining()) { // 所有数据都已写入
                                    System.out.println("Sent response to " + socketChannel.getRemoteAddress() + ": " + response.trim());
                                    // 写完后,如果期望客户端继续发送数据,则重新注册OP_READ
                                    key.interestOps(SelectionKey.OP_READ);
                                    state.currentState = States.WriteComplete; // 状态更新
                                } else {
                                    // 部分数据已写入,等待下一次OP_WRITE事件继续写入
                                    System.out.println("Partial write, " + writeBuffer.remaining() + " bytes remaining.");
                                }
                            } else {
                                // 业务逻辑未完成,或者没有数据需要写入,取消OP_WRITE兴趣
                                // 避免在没有数据可写时持续触发OP_WRITE
                                key.interestOps(SelectionKey.OP_READ); // 假设此时应该等待客户端的下一个请求
                            }
                        }
                    } catch (IOException e) {
                        System.err.println("Error processing channel " + key.channel() + ": " + e.getMessage());
                        closeConnection((SocketChannel) key.channel(), key);
                    }
                }
            }
        }
    }

    private void closeConnection(SocketChannel channel, SelectionKey key) {
        try {
            channel.close();
            key.cancel();
            connectionStates.remove(channel);
            System.out.println("Connection closed for " + channel.getRemoteAddress());
        } catch (IOException e) {
            System.err.println("Error closing channel " + channel.getRemoteAddress() + ": " + e.getMessage());
        }
    }
}

关键改进点:

  1. 动态调整兴趣集:
    • 新接受的SocketChannel只注册SelectionKey.OP_READ。服务器只有在需要从客户端读取数据时才关注读事件。
    • 当成功读取客户端数据并提交到业务线程池处理后,将SelectionKey的兴趣集从OP_READ切换到OP_WRITE (key.interestOps(SelectionKey.OP_WRITE))。这表示服务器现在准备向客户端发送响应。
    • 当数据成功写入客户端后,如果期望客户端继续发送数据,则将兴趣集切换回OP_READ。如果会话结束,则可以关闭连接。
  2. 避免key.cancel()滥用: key.cancel()只在通道关闭时调用,确保通道的生命周期管理正确。
  3. 连接状态管理: 使用Map<SocketChannel, ConnectionState>来存储每个连接的更详细状态(如States枚举),以及与该连接相关的ByteBuffer和MyTask实例。这使得每个连接的上下文独立且易于管理。
  4. 业务逻辑异步化: 将耗时的业务逻辑(如MyTask的run()方法)提交到独立的线程池中执行,避免阻塞NIO主线程,从而确保选择器能够持续处理其他连接的I/O事件。
  5. 完整的读写处理: 确保ByteBuffer在读写操作之间正确地flip()和clear()。处理read()返回-1(客户端关闭)的情况。
  6. key.isValid()检查: 在处理任何事件之前,检查SelectionKey是否仍然有效,以避免对已取消的键进行操作。

客户端代码(无需修改,但理解其行为):

客户端代码保持不变,它发送一个包含读写时间的消息到服务器。

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Random;

public class MyClient {

    public static void main(String [] args) {

        Random rand = new Random();
        int secondsToRead = rand.nextInt(5) + 1; // 1-5秒
        int secondsToWrite = rand.nextInt(5) + 1; // 1-5秒
        String message = "Seconds for the task to be read and written: " + secondsToRead + " " + secondsToWrite;
        System.out.println("Client sending message: " + message);
        Socket socket = null;
        try {
            socket = new Socket("127.0.0.1", 9876);
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            printWriter.println(message);
            System.out.println("Message sent. Waiting for response...");

            // 读取服务器响应
            java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(socket.getInputStream()));
            String serverResponse = reader.readLine();
            System.out.println("Server response: " + serverResponse);

        } catch (IOException e) {
            System.out.println("Error in Socket: " + e.getMessage());
            System.exit(-1);
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    // ignore
                }
            }
        }
    }
}

客户端代码补充说明: 为了更好地演示服务器的响应,客户端代码增加了读取服务器响应的逻辑,使其能接收到服务器发送的“Server received and processed...”消息。

注意事项与总结

  1. 原生NIO的复杂性: Java原生NIO虽然提供了高性能的非阻塞I/O能力,但其API设计较为底层,要求开发者对I/O事件循环、SelectionKey管理、ByteBuffer操作以及线程模型有深入理解。任何一个环节的疏忽都可能导致性能问题、死锁或连接异常。
  2. OP_WRITE的特性: OP_WRITE事件只要通道的发送缓冲区有空间就会一直触发。因此,只有在确实有数据需要写入时才应该关注OP_WRITE。一旦数据写入完成,应立即取消OP_WRITE的关注,或将其切换为其他事件(如OP_READ),以避免不必要的事件触发和CPU开销。
  3. 推荐使用NIO框架: 对于生产

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

77

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

40

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

67

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

47

2025.11.27

Golang channel原理
Golang channel原理

本专题整合了Golang channel通信相关介绍,阅读专题下面的文章了解更多详细内容。

261

2025.11.14

golang channel相关教程
golang channel相关教程

本专题整合了golang处理channel相关教程,阅读专题下面的文章了解更多详细内容。

351

2025.11.17

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 4.4万人学习

C# 教程
C# 教程

共94课时 | 11.2万人学习

Java 教程
Java 教程

共578课时 | 81.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号