BIO/NIO聊天室
Socket demo
- BIO
- Client端
- 客户端主入口
- UDPClient
- serverinfo bean
- TCPClient
- Server端
- server 入口函数
- UDPServer
- TCPServer
- ClientHandler
- CloseUtil
- NIO 聊天室基础版
- code逻辑
- 原本的聊天室 server端业务逻辑:
- NIO改造之后的逻辑
- TCP Server:
- ClientListener
- ClientHandler
- 4 进一步的线程优化
- 类库
- IoSelectorProvider
BIO
涉及到的 知识点:
一个 demo,实现的功能是 客户端 发广播,搜索目标服务器的 ip,port, 得到之后建立 TCP连接。 实现 客户端 服务端 互相收发数据, 两者都是 既可以 收 也可以主动的 发。
Client端
客户端主入口
- 进行 UDP 搜索,拿到 对应的 server 回传的 服务端信息, 然后建立tcp连接;
- 所以 服务端 要 先 打开 TCP的监听线程;若是先打开了udp线程,那么有可能客户端 已经根据 信息发送过来了tcp连接,而服务端 tcp线程还没打开的情况。(为什么不用CountDownLatch? 这个是在同一台机器上运行的线程 等待其他线程执行完用的。 而此时客户端 和 服务端的线程 不在 一台机器, 客户端的tcp连接线程 不能 用CountDownLatch 等待 服务端的tcp监听启动。所以对于 服务端 入口, 一定要 确保 tcp监听已经打开,才能 启动 udp线程)
package client;import client.bean.ServerInfo;import java.io.IOException;public class Client {public static void main(String[] args) {ServerInfo info = UDPSearcher.searchServer(10000);System.out.println("Server:" + info);if (info != null) {try {TCPClient.linkWith(info);} catch (IOException e) {e.printStackTrace();}}}
}
UDPClient
进行UDP搜索,在局域网内 发 广播, 接收 服务器回送的 服务器信息。所以 服务器信息 要封装成一个 bean。
代码思路:
首先要有一个监听进程,负责监听指定的回送端口,要先等这个线程起来,所以用一个latch,在监听起来之后 countdown;
之后就是 构造数据发送广播
然后还有收到回送,所以这里也用一个latch,若超时则结束的。 在监听中若收到了回送,这个latch countsown
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class UDPSearcher {private static final int LISTEN_PORT = UDPConstants.PORT_CLIENT_RESPONSE;public static ServerInfo searchServer(int timeout) {System.out.println("UDPSearcher Started.");// 成功收到回送的栅栏CountDownLatch receiveLatch = new CountDownLatch(1);Listener listener = null;try {listener = listen(receiveLatch);sendBroadcast();receiveLatch.await(timeout, TimeUnit.MILLISECONDS);} catch (Exception e) {e.printStackTrace();}// 完成System.out.println("UDPSearcher Finished.");if (listener == null) {return null;}List<ServerInfo> devices = listener.getServerAndClose();if (devices.size() > 0) {return devices.get(0);}return null;}private static Listener listen(CountDownLatch receiveLatch) throws InterruptedException {System.out.println("UDPSearcher start listen.");CountDownLatch startDownLatch = new CountDownLatch(1);Listener listener = new Listener(LISTEN_PORT, startDownLatch, receiveLatch);listener.start();startDownLatch.await();return listener;}private static void sendBroadcast() throws IOException {System.out.println("UDPSearcher sendBroadcast started.");// 作为搜索方,让系统自动分配端口DatagramSocket ds = new DatagramSocket();// 构建一份请求数据ByteBuffer byteBuffer = ByteBuffer.allocate(128);// 头部byteBuffer.put(UDPConstants.HEADER);// CMD命名byteBuffer.putShort((short) 1);// 回送端口信息byteBuffer.putInt(LISTEN_PORT);// 直接构建packetDatagramPacket requestPacket = new DatagramPacket(byteBuffer.array(),byteBuffer.position() + 1);// 广播地址requestPacket.setAddress(InetAddress.getByName("255.255.255.255"));// 设置服务器端口requestPacket.setPort(UDPConstants.PORT_SERVER);// 发送ds.send(requestPacket);ds.close();// 完成System.out.println("UDPSearcher sendBroadcast finished.");}private static class Listener extends Thread {private final int listenPort;private final CountDownLatch startDownLatch;private final CountDownLatch receiveDownLatch;private final List<ServerInfo> serverInfoList = new ArrayList<>();private final byte[] buffer = new byte[128];private final int minLen = UDPConstants.HEADER.length + 2 + 4;private boolean done = false;private DatagramSocket ds = null;private Listener(int listenPort, CountDownLatch startDownLatch, CountDownLatch receiveDownLatch) {super();this.listenPort = listenPort;this.startDownLatch = startDownLatch;this.receiveDownLatch = receiveDownLatch;}@Overridepublic void run() {super.run();// 通知已启动startDownLatch.countDown();try {// 监听回送端口ds = new DatagramSocket(listenPort);// 构建接收实体DatagramPacket receivePack = new DatagramPacket(buffer, buffer.length);while (!done) {// 接收ds.receive(receivePack);// 打印接收到的信息与发送者的信息// 发送者的IP地址String ip = receivePack.getAddress().getHostAddress();int port = receivePack.getPort();int dataLen = receivePack.getLength();byte[] data = receivePack.getData();boolean isValid = dataLen >= minLen&& ByteUtils.startsWith(data, UDPConstants.HEADER);System.out.println("UDPSearcher receive form ip:" + ip+ "\tport:" + port + "\tdataValid:" + isValid);if (!isValid) {// 无效继续continue;}ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, UDPConstants.HEADER.length, dataLen);final short cmd = byteBuffer.getShort();final int serverPort = byteBuffer.getInt();if (cmd != 2 || serverPort <= 0) {System.out.println("UDPSearcher receive cmd:" + cmd + "\tserverPort:" + serverPort);continue;}String sn = new String(buffer, minLen, dataLen - minLen);ServerInfo info = new ServerInfo(serverPort, ip, sn);serverInfoList.add(info);// 成功接收到一份receiveDownLatch.countDown();}} catch (Exception ignored) {} finally {close();}System.out.println("UDPSearcher listener finished.");}private void close() {if (ds != null) {ds.close();ds = null;}}List<ServerInfo> getServerAndClose() {done = true;close();return serverInfoList;}}
}
serverinfo bean
public class ServerInfo {private String sn;private int port;private String address;public ServerInfo(int port, String ip, String sn) {this.port = port;this.address = ip;this.sn = sn;}public String getSn() {return sn;}public void setSn(String sn) {this.sn = sn;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "ServerInfo{" +"sn='" + sn + '\'' +", port=" + port +", address='" + address + '\'' +'}';}
}
TCPClient
主动发起( connect) TCP连接,读 写 数据
package client;import client.bean.ServerInfo;
import clink.net.qiujuer.clink.utils.CloseUtils;import java.io.*;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;public class TCPClient {public static void linkWith(ServerInfo info) throws IOException {Socket socket = new Socket();// 超时时间socket.setSoTimeout(3000);// 连接本地,端口2000;超时时间3000mssocket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()), info.getPort()), 3000);System.out.println("已发起服务器连接,并进入后续流程~");System.out.println("客户端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort());System.out.println("服务器信息:" + socket.getInetAddress() + " P:" + socket.getPort());try {ReadHandler readHandler = new ReadHandler(socket.getInputStream());readHandler.start();// 发送接收数据write(socket);// 退出操作readHandler.exit();} catch (Exception e) {System.out.println("异常关闭");}// 释放资源socket.close();System.out.println("客户端已退出~");}private static void write(Socket client) throws IOException {// 构建键盘输入流InputStream in = System.in;BufferedReader input = new BufferedReader(new InputStreamReader(in));// 得到Socket输出流,并转换为打印流OutputStream outputStream = client.getOutputStream();PrintStream socketPrintStream = new PrintStream(outputStream);do {// 键盘读取一行String str = input.readLine();// 发送到服务器socketPrintStream.println(str);if ("00bye00".equalsIgnoreCase(str)) {break;}} while (true);// 资源释放socketPrintStream.close();}static class ReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ReadHandler(InputStream inputStream) {this.inputStream = inputStream;}@Overridepublic void run() {super.run();try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {String str;try {// 客户端拿到一条数据str = socketInput.readLine();} catch (SocketTimeoutException e) {continue;}if (str == null) {System.out.println("连接已关闭,无法读取数据!");break;}// 打印到屏幕System.out.println(str);} while (!done);} catch (Exception e) {if (!done) {System.out.println("连接异常断开:" + e.getMessage());}} finally {// 连接关闭CloseUtils.close(inputStream);}}void exit() {done = true;CloseUtils.close(inputStream);}}
}
Server端
server 入口函数
import constants.TCPConstants;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class Server {public static void main(String[] args) throws IOException {TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);boolean isSucceed = tcpServer.start();if (!isSucceed) {System.out.println("Start TCP server failed!");return;}UDPProvider.start(TCPConstants.PORT_SERVER);BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));String str;do {str = bufferedReader.readLine();tcpServer.broadcast(str);} while (!"00bye00".equalsIgnoreCase(str));UDPProvider.stop();tcpServer.stop();}
}
UDPServer
服务端处理 UDP 请求的 类
package server;import clink.net.qiujuer.clink.utils.ByteUtils;
import constants.UDPConstants;import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.ByteBuffer;
import java.util.UUID;class UDPProvider {private static Provider PROVIDER_INSTANCE;static void start(int port) {//因为provider是个单例模式,所以开始前先判断如果当前有连接,需要先停止stop();String sn = UUID.randomUUID().toString();Provider provider = new Provider(sn, port);provider.start();PROVIDER_INSTANCE = provider;}static void stop() {if (PROVIDER_INSTANCE != null) {PROVIDER_INSTANCE.exit();PROVIDER_INSTANCE = null;}}private static class Provider extends Thread {//服务器端的udp主要就是接收到广播后,匹配了就传回去自己的信息,然后再建立TCP连接private final byte[] sn;private final int port;private boolean done = false;//服务器端的socketprivate DatagramSocket ds = null;// 存储消息的Bufferfinal byte[] buffer = new byte[128];Provider(String sn, int port) {super();this.sn = sn.getBytes();this.port = port;}@Overridepublic void run() {super.run();System.out.println("UDPProvider Started.");try {// 监听20000 端口ds = new DatagramSocket(UDPConstants.PORT_SERVER);// 接收消息的PacketDatagramPacket receivePack = new DatagramPacket(buffer, buffer.length);while (!done) {// 接收ds.receive(receivePack);// 打印接收到的信息与发送者的信息// 发送者的IP地址String clientIp = receivePack.getAddress().getHostAddress();int clientPort = receivePack.getPort();int clientDataLen = receivePack.getLength();byte[] clientData = receivePack.getData();//验证合法性boolean isValid = clientDataLen >= (UDPConstants.HEADER.length + 2 + 4)&& ByteUtils.startsWith(clientData, UDPConstants.HEADER);System.out.println("UDPProvider receive form ip:" + clientIp+ "\tport:" + clientPort + "\tdataValid:" + isValid);if (!isValid) {// 无效继续continue;}// 解析命令与回送端口int index = UDPConstants.HEADER.length;short cmd = (short) ((clientData[index++] << 8) | (clientData[index++] & 0xff));int responsePort = (((clientData[index++]) << 24) |((clientData[index++] & 0xff) << 16) |((clientData[index++] & 0xff) << 8) |((clientData[index] & 0xff)));// 判断合法性if (cmd == 1 && responsePort > 0) {// 构建一份回送数据//回送的是服务器信息ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);byteBuffer.put(UDPConstants.HEADER);byteBuffer.putShort((short) 2);byteBuffer.putInt(port);byteBuffer.put(sn);int len = byteBuffer.position();// 直接根据发送者构建一份回送信息DatagramPacket responsePacket = new DatagramPacket(buffer,len,receivePack.getAddress(),//发送者的ipresponsePort);ds.send(responsePacket);System.out.println("UDPProvider response to:" + clientIp + "\tport:" + responsePort + "\tdataLen:" + len);} else {System.out.println("UDPProvider receive cmd nonsupport; cmd:" + cmd + "\tport:" + port);}}} catch (Exception ignored) {} finally {close();}// 完成System.out.println("UDPProvider Finished.");}private void close() {if (ds != null) {ds.close();ds = null;}}/*** 提供结束*/void exit() {done = true;close();}}
}
TCPServer
处理tcp连接的类,这个类主要负责 监听 指定 端口, 接受连接(accept),拿到对应的socket。 数据的 读和写 由handler类来单独处理。
package server;import server.handle.ClientHandler;import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;public class TCPServer {private final int port;private ClientListener mListener;private List<ClientHandler> clientHandlerList = new ArrayList<>();public TCPServer(int port) {this.port = port;}public boolean start() {try {ClientListener listener = new ClientListener(port);mListener = listener;listener.start();} catch (IOException e) {e.printStackTrace();return false;}return true;}public void stop() {if (mListener != null) {mListener.exit();}for (ClientHandler clientHandler : clientHandlerList) {clientHandler.exit();}clientHandlerList.clear();}public void broadcast(String str) {for (ClientHandler clientHandler : clientHandlerList) {clientHandler.send(str);}}private class ClientListener extends Thread {private ServerSocket server;private boolean done = false;private ClientListener(int port) throws IOException {server = new ServerSocket(port);System.out.println("服务器信息:" + server.getInetAddress() + " P:" + server.getLocalPort());}@Overridepublic void run() {super.run();System.out.println("服务器准备就绪~");// 等待客户端连接do {// 得到客户端Socket client;try {client = server.accept();} catch (IOException e) {continue;}try {// 客户端构建异步线程ClientHandler clientHandler = new ClientHandler(client,handler -> clientHandlerList.remove(handler));// 读取数据并打印clientHandler.readToPrint();clientHandlerList.add(clientHandler);} catch (IOException e) {e.printStackTrace();System.out.println("客户端连接异常:" + e.getMessage());}} while (!done);System.out.println("服务器已关闭!");}void exit() {done = true;try {server.close();} catch (IOException e) {e.printStackTrace();}}}
}
ClientHandler
处理 TCP连接建立之后的 数据 读写的类。
package server.handle;import clink.net.qiujuer.clink.utils.CloseUtils;import java.io.*;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ClientHandler {private final Socket socket;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final CloseNotify closeNotify;public ClientHandler(Socket socket, CloseNotify closeNotify) throws IOException {this.socket = socket;this.readHandler = new ClientReadHandler(socket.getInputStream());this.writeHandler = new ClientWriteHandler(socket.getOutputStream());this.closeNotify = closeNotify;System.out.println("新客户端连接:" + socket.getInetAddress() +" P:" + socket.getPort());}public void exit() {readHandler.exit();writeHandler.exit();CloseUtils.close(socket);System.out.println("客户端已退出:" + socket.getInetAddress() +" P:" + socket.getPort());}public void send(String str) {writeHandler.send(str);}public void readToPrint() {readHandler.start();}private void exitBySelf() {exit();closeNotify.onSelfClosed(this);}public interface CloseNotify {void onSelfClosed(ClientHandler handler);}class ClientReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ClientReadHandler(InputStream inputStream) {this.inputStream = inputStream;}@Overridepublic void run() {super.run();try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客户端拿到一条数据String str = socketInput.readLine();if (str == null) {System.out.println("客户端已无法读取数据!");// 退出当前客户端ClientHandler.this.exitBySelf();break;}// 打印到屏幕System.out.println(str);} while (!done);} catch (Exception e) {if (!done) {System.out.println("连接异常断开");ClientHandler.this.exitBySelf();}} finally {// 连接关闭CloseUtils.close(inputStream);}}void exit() {done = true;CloseUtils.close(inputStream);}}/*** 这里为什么用单线程池,而不是使用一个线程呢?* 因为涉及到 可能需要等 其他的线程把数据传来,然后才能发送, 线程之间 由 合作的关系,单个线程不方便管理,所以用线程池来协调*/class ClientWriteHandler {private boolean done = false;private final PrintStream printStream;private final ExecutorService executorService;ClientWriteHandler(OutputStream outputStream) {this.printStream = new PrintStream(outputStream);this.executorService = Executors.newSingleThreadExecutor();}void exit() {done = true;CloseUtils.close(printStream);executorService.shutdownNow();}void send(String str) {executorService.execute(new WriteRunnable(str));}class WriteRunnable implements Runnable {private final String msg;WriteRunnable(String msg) {this.msg = msg;}@Overridepublic void run() {if (ClientWriteHandler.this.done) {return;}try {ClientWriteHandler.this.printStream.println(msg);} catch (Exception e) {e.printStackTrace();}}}}
}
CloseUtil
关闭工具类, 用来减少重复的代码
public class CloseUtils {public static void close(Closeable... closeables) {if (closeables == null) {return;}for (Closeable closeable : closeables) {try {closeable.close();} catch (IOException e) {e.printStackTrace();}}}
}
NIO 聊天室基础版
code逻辑
原本的聊天室 server端业务逻辑:
1 一个监听端口线程,accept连接
2 对每个连接创建一个clienthandler 线程 去进行 客户端 读写操作;
3 对于消息的转发, 需要所有的当前在线客户端信息,因此,必须得主线程来做。还有一些启动 关闭操作也得主线程。 转发也是使用了单线程池
ClientHandler 中:
1 读: 从客户端读数据 ClientReadHandler extends Thread, 为什么这里也要开新的线程? 因为 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); 流是阻塞的, 如果不用新的线程,可能会影响后面信息的读取;
读到之后, 要通知 主线程, 让主线程对所有在线的客户端进行转发clientHandlerCallback.onNewMessageArrived
2 写: 这里用了一个 单线程池。
NIO改造之后的逻辑
注意这里只是改成了使用NIO, 线程还是很多;
TCP Server:
TCP Server:
1 private Selector selector;
private ServerSocketChannel server;
两个全局变量
2 start():
try {//是选择,监听还是得自己写selector = Selector.open();//相当于连接 和 流打开ServerSocketChannel server = ServerSocketChannel.open();// 设置为非阻塞server.configureBlocking(false);// 绑定本地端口server.socket().bind(new InetSocketAddress(port));// 注册客户端连接到达监听,这里只是注册事件,所以可以写在监听前面,真正的接收某个连接,在监听里面server.register(selector, SelectionKey.OP_ACCEPT);this.server = server;System.out.println("服务器信息:" + server.getLocalAddress().toString());// 启动客户端监听/***** 3333333333333333333*/ClientListener listener = this.listener = new ClientListener();listener.start();} catch (IOException e) {e.printStackTrace();return false;}return true;}
ClientListener
/**** 33333333333333*/private class ClientListener extends Thread {private boolean done = false;@Overridepublic void run() {super.run();Selector selector = TCPServer.this.selector;System.out.println("服务器准备就绪~");// 等待客户端连接,必须是个循环不断的过程do {// 得到客户端try {//本来是acceptif (selector.select() == 0) {if (done) {break;}continue;}//获取当前就绪的事件用selector.selectedKeys(), 然后再放进了迭代器Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {if (done) {break;}//已经使用了,要及时清除掉SelectionKey key = iterator.next();iterator.remove();// 检查当前Key的状态是否是我们关注的// 客户端到达状态if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();// 非阻塞状态拿到客户端连接//这里的 accept 为什么不阻塞? 因为 从selector那里 拿到的key是Acceptable()的SocketChannel socketChannel = serverSocketChannel.accept();try {// 客户端构建异步线程???/*** 这不还是一个客户端一个线程吗*/ClientHandler clientHandler = new ClientHandler(socketChannel, TCPServer.this);// 读取数据并打印clientHandler.readToPrint();// 添加同步处理synchronized (TCPServer.this) {clientHandlerList.add(clientHandler);}} catch (IOException e) {e.printStackTrace();System.out.println("客户端连接异常:" + e.getMessage());}}}} catch (IOException e) {e.printStackTrace();}} while (!done);System.out.println("服务器已关闭!");}void exit() {done = true;// 唤醒当前的阻塞selector.wakeup();}}
ClientHandler
public class ClientHandler {private final SocketChannel socketChannel;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final ClientHandlerCallback clientHandlerCallback;private final String clientInfo;/*** 1111111111* socket 变 SocketChannel, 注册读写事件* @param socketChannel* @param clientHandlerCallback* @throws IOException*/public ClientHandler(SocketChannel socketChannel, ClientHandlerCallback clientHandlerCallback) throws IOException {this.socketChannel = socketChannel;// 设置非阻塞模式socketChannel.configureBlocking(false);//读写事件//selector 怎么那么多啊, 一个处理读一个处理写Selector readSelector = Selector.open();socketChannel.register(readSelector, SelectionKey.OP_READ);this.readHandler = new ClientReadHandler(readSelector);Selector writeSelector = Selector.open();socketChannel.register(writeSelector, SelectionKey.OP_WRITE);this.writeHandler = new ClientWriteHandler(writeSelector);this.clientHandlerCallback = clientHandlerCallback;this.clientInfo = socketChannel.getRemoteAddress().toString();System.out.println("新客户端连接:" + clientInfo);}public String getClientInfo() {return clientInfo;}public void exit() {readHandler.exit();writeHandler.exit();CloseUtils.close(socketChannel);System.out.println("客户端已退出:" + clientInfo);}public void send(String str) {writeHandler.send(str);}public void readToPrint() {readHandler.start();}private void exitBySelf() {exit();clientHandlerCallback.onSelfClosed(this);}public interface ClientHandlerCallback {// 自身关闭通知void onSelfClosed(ClientHandler handler);// 收到消息时通知void onNewMessageArrived(ClientHandler handler, String msg);}/*** 2222222222222222* 改造读*/class ClientReadHandler extends Thread {private boolean done = false;private final Selector selector;//通道输出得用这个private final ByteBuffer byteBuffer;ClientReadHandler(Selector selector) {this.selector = selector;this.byteBuffer = ByteBuffer.allocate(256);}@Overridepublic void run() {super.run();try {//读操作,怎么拿到数据,从selectionKey中do {// 客户端拿到一条数据if (selector.select() == 0) {if (done) {break;}continue;}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {if (done) {break;}SelectionKey key = iterator.next();iterator.remove();//读到了一个,拿到当前的连接channel//这些处理都差不多, 只不过这里判断的是 是否可读if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();// 清空操作byteBuffer.clear();// 读取int read = client.read(byteBuffer);if (read > 0) {// 丢弃换行符String str = new String(byteBuffer.array(), 0, read - 1);// 通知到TCPServerclientHandlerCallback.onNewMessageArrived(ClientHandler.this, str);} else {System.out.println("客户端已无法读取数据!");// 退出当前客户端ClientHandler.this.exitBySelf();break;}}}} while (!done);} catch (Exception e) {if (!done) {System.out.println("连接异常断开");ClientHandler.this.exitBySelf();}} finally {// 连接关闭CloseUtils.close(selector);}}/*** 只关 selector???*/void exit() {done = true;selector.wakeup();CloseUtils.close(selector);}}/*** 3 3333333333333333*/class ClientWriteHandler {private boolean done = false;private final Selector selector;private final ByteBuffer byteBuffer;private final ExecutorService executorService;ClientWriteHandler(Selector selector) {this.selector = selector;this.byteBuffer = ByteBuffer.allocate(256);this.executorService = Executors.newSingleThreadExecutor();}void exit() {done = true;CloseUtils.close(selector);executorService.shutdownNow();}void send(String str) {if (done) {return;}executorService.execute(new WriteRunnable(str));}/*** 444444444444444444*/class WriteRunnable implements Runnable {private final String msg;WriteRunnable(String msg) {this.msg = msg + '\n';}@Overridepublic void run() {if (ClientWriteHandler.this.done) {return;}byteBuffer.clear();byteBuffer.put(msg.getBytes());// 反转操作, 重点,把指针换到前面,因为put之后指针在最后,而发送数据是从指针位置开始发送//反转后再发送,才不是发送的空数据//翻转后, 指针才会指向初始位置, 结束指针才会指向结束位置byteBuffer.flip();while (!done && byteBuffer.hasRemaining()) {try {int len = socketChannel.write(byteBuffer);// len = 0 合法,因为是非阻塞io ,而又直接发送的,所以不一定有数据要发送;只有负数才是异常的。if (len < 0) {System.out.println("客户端已无法发送数据!");//自己退出ClientHandler.this.exitBySelf();break;}} catch (Exception e) {e.printStackTrace();}}}}}
}
4 进一步的线程优化
上面的例子中, selector一个线程, 每个channel对应两个线程,一个读一个写; 现在针对这部分优化;
但是 为什么 clientHandler已经拿到了socketChannel, 为什么还有 读写各自一个selector;
不是直接使用一个线程,这样容易造成拥塞(因为所有的任务都在这一个线程串行执行), 而是 监听与数据处理线程分离;, 监听一个线程, 用来处理accept, 所有的socketChannel对应的数据的输入输出 一个线程(或者一个线程池)。
实际上也不需要给所有的客户端分配线程,因为网络带宽是有限的, 一旦带宽被占据了, 数据也是没办法发送接收的。
类库
IoSelectorProvider
主要负责 读写在 selector的注册:
共有变量和构造函数
private final AtomicBoolean isClosed = new AtomicBoolean(false);/*** 555555555555555555* readSelector的锁是否处于某个过程*/private final AtomicBoolean inRegInput = new AtomicBoolean(false);private final AtomicBoolean inRegOutput = new AtomicBoolean(false);private final Selector readSelector;private final Selector writeSelector;private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();private final ExecutorService inputHandlePool;private final ExecutorService outputHandlePool;public IoSelectorProvider() throws IOException {readSelector = Selector.open();writeSelector = Selector.open();inputHandlePool = Executors.newFixedThreadPool(4,new IoProviderThreadFactory("IoProvider-Input-Thread-"));outputHandlePool = Executors.newFixedThreadPool(4,new IoProviderThreadFactory("IoProvider-Output-Thread-"));// 开始输出输入的监听startRead();startWrite();}
/*** 111111111111111111* 开始 读*/private void startRead() {Thread thread = new Thread("Clink IoSelectorProvider ReadSelector Thread") {@Overridepublic void run() {while (!isClosed.get()) {try {// 这里有对readSelector的读操作if (readSelector.select() == 0) {waitSelection(inRegInput);continue;}/**为什么一定要有取消注册?因为 这里 可以看到对channel的读是 用一个线程池来处理的,可以理解为立即就有结果,也就是数组可以很快遍历,没有阻塞那么如果存在某个 channel 没有完成读操作, 那么下一次循环readSelector.select() == 0 仍然不是0, 也就是没完成的话这个channel的读,会仍然注册在selector 上, 在这次的循环中,仍然会被加入 线程池。。如果有的任务执行缓慢, 那么会在线程池大量堆积。。这当然是要避免的。,所以对于,已经提交到线程池的,要及时取消注册。*/Set<SelectionKey> selectionKeys = readSelector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {if (selectionKey.isValid()) {//处理 当前 事件的 ,需要声明 当前 究竟是读事件,还是写事件//将 执行结果的 返回 存到map中// 异步 ,用的线程池handleSelection(selectionKey, SelectionKey.OP_READ, inputCallbackMap, inputHandlePool);}}selectionKeys.clear();} catch (IOException e) {e.printStackTrace();}}}};thread.setPriority(Thread.MAX_PRIORITY);thread.start();}
/*** 22222222222222222222222222* 如何 处理 所有的 注册到 selector上的事件*/private static void handleSelection(SelectionKey key, int keyOps,HashMap<SelectionKey, Runnable> map,ExecutorService pool) {// 重点,!!!!!!!!!!!!!!!!!!!!!// 为什么要先取消注册??? 看1的注释// 取消继续对keyOps的监听//selector只有注册操作, 看3,没有取消注册,所以这里要自己实现//各种readyOps(),其实都是int值, 注册事件其实就是叠加ops的int值,要取消事件, 就把值去掉key.readyOps() & ~keyOpskey.interestOps(key.readyOps() & ~keyOps);Runnable runnable = null;//拿到当前 key 对应的runnable,然后加入线程池进行异步调度try {runnable = map.get(key);} catch (Exception ignored) {}if (runnable != null && !pool.isShutdown()) {// 异步调度pool.execute(runnable);}}
/*** 3333333333333333333*/@Overridepublic boolean registerInput(SocketChannel channel, HandleInputCallback callback) {// 注册了 channel, 则 当前 channel只要可读,就要进行回调。/*这里必须有取消注册的过程, 原因可以看1 的注释*/return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,inputCallbackMap, callback) != null;}
/*** 444444444444444444444* 1 有对readSelector 的读操作,* 这里也有,这里的是 添加内容, 相当于写, 所以要加锁, 锁住readSelector,或者锁住操作readSelector的代码* 用原子布尔值当锁, 见5**/private static SelectionKey registerSelection(SocketChannel channel, Selector selector,int registerOps, AtomicBoolean locker,HashMap<SelectionKey, Runnable> map,Runnable runnable) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {// 设置锁定状态locker.set(true);try {// 唤醒当前的selector,让selector不处于select()状态//先唤醒,避免注册无效, 可以说是二次select?????不懂selector.wakeup();SelectionKey key = null;if (channel.isRegistered()) {// 查询是否已经注册过key = channel.keyFor(selector);if (key != null) {key.interestOps(key.readyOps() | registerOps);}}if (key == null) {// 注册selector得到Keykey = channel.register(selector, registerOps);// 注册回调map.put(key, runnable);}return key;} catch (ClosedChannelException e) {return null;} finally {// 解除锁定状态locker.set(false);try {// 通知locker.notify();} catch (Exception ignored) {}}}}
BIO/NIO聊天室相关推荐
- java NIO及NIO聊天室
参考链接: java NIO实例1:http://blog.chinaunix.net/uid-25808509-id-3346228.html java NIO教程之selector(一系列): h ...
- JavaFX+NIO聊天室第四篇表情包
设计思路 表情包是我们聊天中经常使用的功能,他的实现有两种办法.一种是使用Unicode中编码的700多种的表情字符.另外一种是使用图片来充当表情包. 1.从2010年开始,unicode对emoji ...
- 【Netty】NIO 网络编程 聊天室案例
文章目录 一. NIO 聊天室需求 二. NIO 聊天室 服务器端 代码分析 三. NIO 聊天室 客户端 代码分析 四. NIO 聊天室 服务器端 完整代码 五. NIO 聊天室 客户端 完整代码 ...
- NIO及多线程实现聊天室完整版
一 NIO原理 NIO与BIO虽然都叫IO,但实现的方式完全不同: IO面向流Stream–建立管道(InputStream/OutputStream)直接面对字节的数据流传输数据是单向的 NIO面向 ...
- Java-NIO实战多人聊天室
NIO服务端 public class NioServer {/*** 启动*/public void start() throws IOException {/*** 1. 创建Selector*/ ...
- Java BIO多人聊天室
基于上篇NIO的多人聊天室,这篇将用BIO也实现一遍 首先是服务端的设计: /*** @author Jing* @create 2020/5/17*/ public class ChatServer ...
- JAVA网络编程NIO实现简易多人聊天室
BIO模型 BIO即blocking IO,顾名思义是一种阻塞模型.当没有客户端连接时,服务端会一直阻塞,当有客户端新建连接时,服务端会新开一个线程去响应(不用多线程的话服务端同一时刻最多只能接收一个 ...
- NIO网络编程实战之简单多人聊天室
NIO网络编程实战 利用NIO编程知识,实现多人聊天室. 1. NIO编程实现步骤 第一步:创建Selector 第二步:创建ServerSocketChannel,并绑定监听端口 第三步:将Chan ...
- Java NIO示例:多人网络聊天室完整代码
服务端: package cn.zhangxueliang.herostory.chatroom;import java.io.IOException; import java.net.InetSo ...
最新文章
- 编程开发使用的软件大全
- rsync+inotify实现实时同步案例
- leetcode 137. 只出现一次的数字 II(位运算)
- Sublime Text 3插件安装方法
- python以列表的形式输出_简单介绍python输出列表元素的所有排列形式
- python中yield的使用_python中yield的用法详解-转载
- textfield获取其中内容_用户认知视角下的产品信息获取体验度量体系研究
- PHP中的Traits用法详解
- 153. php 引用
- bili弹幕姬_B站弹幕姬插件——弹幕日志
- 利用AWVS进行反制
- Tempo数据分析平台,助力企业高效完成数据预处理工作
- 《畅玩NAS》第8章 ZeroTier组建局域网
- 后代选择器和子代选择器
- 【总结】1438- 你想知道的前后端协作规范都在这了
- 【 IntelliJ IDEA 】设置主题和字体
- “入门大数据分析:探索海量数据的奥秘“
- Word中使用交叉引用插入多个参考文献
- 摘抄-对最好程序员的感想
- CP2102 USB转串口电路设计以及介绍