Socket demo

  • BIO
    • Client端
      • 客户端主入口
      • UDPClient
        • serverinfo bean
      • TCPClient
    • Server端
      • server 入口函数
      • UDPServer
      • TCPServer
        • ClientHandler
    • CloseUtil
  • NIO 聊天室基础版
    • code逻辑
      • 原本的聊天室 server端业务逻辑:
      • NIO改造之后的逻辑
        • TCP Server:
        • ClientListener
      • ClientHandler
    • 4 进一步的线程优化
      • 类库
        • IoSelectorProvider

BIO

涉及到的 知识点:

一个 demo,实现的功能是 客户端 发广播,搜索目标服务器的 ip,port, 得到之后建立 TCP连接。 实现 客户端 服务端 互相收发数据, 两者都是 既可以 收 也可以主动的 发。

Client端

客户端主入口

  • 进行 UDP 搜索,拿到 对应的 server 回传的 服务端信息, 然后建立tcp连接;
  • 所以 服务端 要 先 打开 TCP的监听线程;若是先打开了udp线程,那么有可能客户端 已经根据 信息发送过来了tcp连接,而服务端 tcp线程还没打开的情况。(为什么不用CountDownLatch? 这个是在同一台机器上运行的线程 等待其他线程执行完用的。 而此时客户端 和 服务端的线程 不在 一台机器, 客户端的tcp连接线程 不能 用CountDownLatch 等待 服务端的tcp监听启动。所以对于 服务端 入口, 一定要 确保 tcp监听已经打开,才能 启动 udp线程)
package client;import client.bean.ServerInfo;import java.io.IOException;public class Client {public static void main(String[] args) {ServerInfo info = UDPSearcher.searchServer(10000);System.out.println("Server:" + info);if (info != null) {try {TCPClient.linkWith(info);} catch (IOException e) {e.printStackTrace();}}}
}

UDPClient

进行UDP搜索,在局域网内 发 广播, 接收 服务器回送的 服务器信息。所以 服务器信息 要封装成一个 bean。

代码思路:
首先要有一个监听进程,负责监听指定的回送端口,要先等这个线程起来,所以用一个latch,在监听起来之后 countdown;
之后就是 构造数据发送广播
然后还有收到回送,所以这里也用一个latch,若超时则结束的。 在监听中若收到了回送,这个latch countsown

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class UDPSearcher {private static final int LISTEN_PORT = UDPConstants.PORT_CLIENT_RESPONSE;public static ServerInfo searchServer(int timeout) {System.out.println("UDPSearcher Started.");// 成功收到回送的栅栏CountDownLatch receiveLatch = new CountDownLatch(1);Listener listener = null;try {listener = listen(receiveLatch);sendBroadcast();receiveLatch.await(timeout, TimeUnit.MILLISECONDS);} catch (Exception e) {e.printStackTrace();}// 完成System.out.println("UDPSearcher Finished.");if (listener == null) {return null;}List<ServerInfo> devices = listener.getServerAndClose();if (devices.size() > 0) {return devices.get(0);}return null;}private static Listener listen(CountDownLatch receiveLatch) throws InterruptedException {System.out.println("UDPSearcher start listen.");CountDownLatch startDownLatch = new CountDownLatch(1);Listener listener = new Listener(LISTEN_PORT, startDownLatch, receiveLatch);listener.start();startDownLatch.await();return listener;}private static void sendBroadcast() throws IOException {System.out.println("UDPSearcher sendBroadcast started.");// 作为搜索方,让系统自动分配端口DatagramSocket ds = new DatagramSocket();// 构建一份请求数据ByteBuffer byteBuffer = ByteBuffer.allocate(128);// 头部byteBuffer.put(UDPConstants.HEADER);// CMD命名byteBuffer.putShort((short) 1);// 回送端口信息byteBuffer.putInt(LISTEN_PORT);// 直接构建packetDatagramPacket requestPacket = new DatagramPacket(byteBuffer.array(),byteBuffer.position() + 1);// 广播地址requestPacket.setAddress(InetAddress.getByName("255.255.255.255"));// 设置服务器端口requestPacket.setPort(UDPConstants.PORT_SERVER);// 发送ds.send(requestPacket);ds.close();// 完成System.out.println("UDPSearcher sendBroadcast finished.");}private static class Listener extends Thread {private final int listenPort;private final CountDownLatch startDownLatch;private final CountDownLatch receiveDownLatch;private final List<ServerInfo> serverInfoList = new ArrayList<>();private final byte[] buffer = new byte[128];private final int minLen = UDPConstants.HEADER.length + 2 + 4;private boolean done = false;private DatagramSocket ds = null;private Listener(int listenPort, CountDownLatch startDownLatch, CountDownLatch receiveDownLatch) {super();this.listenPort = listenPort;this.startDownLatch = startDownLatch;this.receiveDownLatch = receiveDownLatch;}@Overridepublic void run() {super.run();// 通知已启动startDownLatch.countDown();try {// 监听回送端口ds = new DatagramSocket(listenPort);// 构建接收实体DatagramPacket receivePack = new DatagramPacket(buffer, buffer.length);while (!done) {// 接收ds.receive(receivePack);// 打印接收到的信息与发送者的信息// 发送者的IP地址String ip = receivePack.getAddress().getHostAddress();int port = receivePack.getPort();int dataLen = receivePack.getLength();byte[] data = receivePack.getData();boolean isValid = dataLen >= minLen&& ByteUtils.startsWith(data, UDPConstants.HEADER);System.out.println("UDPSearcher receive form ip:" + ip+ "\tport:" + port + "\tdataValid:" + isValid);if (!isValid) {// 无效继续continue;}ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, UDPConstants.HEADER.length, dataLen);final short cmd = byteBuffer.getShort();final int serverPort = byteBuffer.getInt();if (cmd != 2 || serverPort <= 0) {System.out.println("UDPSearcher receive cmd:" + cmd + "\tserverPort:" + serverPort);continue;}String sn = new String(buffer, minLen, dataLen - minLen);ServerInfo info = new ServerInfo(serverPort, ip, sn);serverInfoList.add(info);// 成功接收到一份receiveDownLatch.countDown();}} catch (Exception ignored) {} finally {close();}System.out.println("UDPSearcher listener finished.");}private void close() {if (ds != null) {ds.close();ds = null;}}List<ServerInfo> getServerAndClose() {done = true;close();return serverInfoList;}}
}

serverinfo bean

public class ServerInfo {private String sn;private int port;private String address;public ServerInfo(int port, String ip, String sn) {this.port = port;this.address = ip;this.sn = sn;}public String getSn() {return sn;}public void setSn(String sn) {this.sn = sn;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "ServerInfo{" +"sn='" + sn + '\'' +", port=" + port +", address='" + address + '\'' +'}';}
}

TCPClient

主动发起( connect) TCP连接,读 写 数据

package client;import client.bean.ServerInfo;
import clink.net.qiujuer.clink.utils.CloseUtils;import java.io.*;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;public class TCPClient {public static void linkWith(ServerInfo info) throws IOException {Socket socket = new Socket();// 超时时间socket.setSoTimeout(3000);// 连接本地,端口2000;超时时间3000mssocket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()), info.getPort()), 3000);System.out.println("已发起服务器连接,并进入后续流程~");System.out.println("客户端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort());System.out.println("服务器信息:" + socket.getInetAddress() + " P:" + socket.getPort());try {ReadHandler readHandler = new ReadHandler(socket.getInputStream());readHandler.start();// 发送接收数据write(socket);// 退出操作readHandler.exit();} catch (Exception e) {System.out.println("异常关闭");}// 释放资源socket.close();System.out.println("客户端已退出~");}private static void write(Socket client) throws IOException {// 构建键盘输入流InputStream in = System.in;BufferedReader input = new BufferedReader(new InputStreamReader(in));// 得到Socket输出流,并转换为打印流OutputStream outputStream = client.getOutputStream();PrintStream socketPrintStream = new PrintStream(outputStream);do {// 键盘读取一行String str = input.readLine();// 发送到服务器socketPrintStream.println(str);if ("00bye00".equalsIgnoreCase(str)) {break;}} while (true);// 资源释放socketPrintStream.close();}static class ReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ReadHandler(InputStream inputStream) {this.inputStream = inputStream;}@Overridepublic void run() {super.run();try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {String str;try {// 客户端拿到一条数据str = socketInput.readLine();} catch (SocketTimeoutException e) {continue;}if (str == null) {System.out.println("连接已关闭,无法读取数据!");break;}// 打印到屏幕System.out.println(str);} while (!done);} catch (Exception e) {if (!done) {System.out.println("连接异常断开:" + e.getMessage());}} finally {// 连接关闭CloseUtils.close(inputStream);}}void exit() {done = true;CloseUtils.close(inputStream);}}
}

Server端

server 入口函数

import constants.TCPConstants;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class Server {public static void main(String[] args) throws IOException {TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);boolean isSucceed = tcpServer.start();if (!isSucceed) {System.out.println("Start TCP server failed!");return;}UDPProvider.start(TCPConstants.PORT_SERVER);BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));String str;do {str = bufferedReader.readLine();tcpServer.broadcast(str);} while (!"00bye00".equalsIgnoreCase(str));UDPProvider.stop();tcpServer.stop();}
}

UDPServer

服务端处理 UDP 请求的 类

package server;import clink.net.qiujuer.clink.utils.ByteUtils;
import constants.UDPConstants;import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.ByteBuffer;
import java.util.UUID;class UDPProvider {private static Provider PROVIDER_INSTANCE;static void start(int port) {//因为provider是个单例模式,所以开始前先判断如果当前有连接,需要先停止stop();String sn = UUID.randomUUID().toString();Provider provider = new Provider(sn, port);provider.start();PROVIDER_INSTANCE = provider;}static void stop() {if (PROVIDER_INSTANCE != null) {PROVIDER_INSTANCE.exit();PROVIDER_INSTANCE = null;}}private static class Provider extends Thread {//服务器端的udp主要就是接收到广播后,匹配了就传回去自己的信息,然后再建立TCP连接private final byte[] sn;private final int port;private boolean done = false;//服务器端的socketprivate DatagramSocket ds = null;// 存储消息的Bufferfinal byte[] buffer = new byte[128];Provider(String sn, int port) {super();this.sn = sn.getBytes();this.port = port;}@Overridepublic void run() {super.run();System.out.println("UDPProvider Started.");try {// 监听20000 端口ds = new DatagramSocket(UDPConstants.PORT_SERVER);// 接收消息的PacketDatagramPacket receivePack = new DatagramPacket(buffer, buffer.length);while (!done) {// 接收ds.receive(receivePack);// 打印接收到的信息与发送者的信息// 发送者的IP地址String clientIp = receivePack.getAddress().getHostAddress();int clientPort = receivePack.getPort();int clientDataLen = receivePack.getLength();byte[] clientData = receivePack.getData();//验证合法性boolean isValid = clientDataLen >= (UDPConstants.HEADER.length + 2 + 4)&& ByteUtils.startsWith(clientData, UDPConstants.HEADER);System.out.println("UDPProvider receive form ip:" + clientIp+ "\tport:" + clientPort + "\tdataValid:" + isValid);if (!isValid) {// 无效继续continue;}// 解析命令与回送端口int index = UDPConstants.HEADER.length;short cmd = (short) ((clientData[index++] << 8) | (clientData[index++] & 0xff));int responsePort = (((clientData[index++]) << 24) |((clientData[index++] & 0xff) << 16) |((clientData[index++] & 0xff) << 8) |((clientData[index] & 0xff)));// 判断合法性if (cmd == 1 && responsePort > 0) {// 构建一份回送数据//回送的是服务器信息ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);byteBuffer.put(UDPConstants.HEADER);byteBuffer.putShort((short) 2);byteBuffer.putInt(port);byteBuffer.put(sn);int len = byteBuffer.position();// 直接根据发送者构建一份回送信息DatagramPacket responsePacket = new DatagramPacket(buffer,len,receivePack.getAddress(),//发送者的ipresponsePort);ds.send(responsePacket);System.out.println("UDPProvider response to:" + clientIp + "\tport:" + responsePort + "\tdataLen:" + len);} else {System.out.println("UDPProvider receive cmd nonsupport; cmd:" + cmd + "\tport:" + port);}}} catch (Exception ignored) {} finally {close();}// 完成System.out.println("UDPProvider Finished.");}private void close() {if (ds != null) {ds.close();ds = null;}}/*** 提供结束*/void exit() {done = true;close();}}
}

TCPServer

处理tcp连接的类,这个类主要负责 监听 指定 端口, 接受连接(accept),拿到对应的socket。 数据的 读和写 由handler类来单独处理。

package server;import server.handle.ClientHandler;import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;public class TCPServer {private final int port;private ClientListener mListener;private List<ClientHandler> clientHandlerList = new ArrayList<>();public TCPServer(int port) {this.port = port;}public boolean start() {try {ClientListener listener = new ClientListener(port);mListener = listener;listener.start();} catch (IOException e) {e.printStackTrace();return false;}return true;}public void stop() {if (mListener != null) {mListener.exit();}for (ClientHandler clientHandler : clientHandlerList) {clientHandler.exit();}clientHandlerList.clear();}public void broadcast(String str) {for (ClientHandler clientHandler : clientHandlerList) {clientHandler.send(str);}}private class ClientListener extends Thread {private ServerSocket server;private boolean done = false;private ClientListener(int port) throws IOException {server = new ServerSocket(port);System.out.println("服务器信息:" + server.getInetAddress() + " P:" + server.getLocalPort());}@Overridepublic void run() {super.run();System.out.println("服务器准备就绪~");// 等待客户端连接do {// 得到客户端Socket client;try {client = server.accept();} catch (IOException e) {continue;}try {// 客户端构建异步线程ClientHandler clientHandler = new ClientHandler(client,handler -> clientHandlerList.remove(handler));// 读取数据并打印clientHandler.readToPrint();clientHandlerList.add(clientHandler);} catch (IOException e) {e.printStackTrace();System.out.println("客户端连接异常:" + e.getMessage());}} while (!done);System.out.println("服务器已关闭!");}void exit() {done = true;try {server.close();} catch (IOException e) {e.printStackTrace();}}}
}

ClientHandler

处理 TCP连接建立之后的 数据 读写的类。

package server.handle;import clink.net.qiujuer.clink.utils.CloseUtils;import java.io.*;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ClientHandler {private final Socket socket;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final CloseNotify closeNotify;public ClientHandler(Socket socket, CloseNotify closeNotify) throws IOException {this.socket = socket;this.readHandler = new ClientReadHandler(socket.getInputStream());this.writeHandler = new ClientWriteHandler(socket.getOutputStream());this.closeNotify = closeNotify;System.out.println("新客户端连接:" + socket.getInetAddress() +" P:" + socket.getPort());}public void exit() {readHandler.exit();writeHandler.exit();CloseUtils.close(socket);System.out.println("客户端已退出:" + socket.getInetAddress() +" P:" + socket.getPort());}public void send(String str) {writeHandler.send(str);}public void readToPrint() {readHandler.start();}private void exitBySelf() {exit();closeNotify.onSelfClosed(this);}public interface CloseNotify {void onSelfClosed(ClientHandler handler);}class ClientReadHandler extends Thread {private boolean done = false;private final InputStream inputStream;ClientReadHandler(InputStream inputStream) {this.inputStream = inputStream;}@Overridepublic void run() {super.run();try {// 得到输入流,用于接收数据BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));do {// 客户端拿到一条数据String str = socketInput.readLine();if (str == null) {System.out.println("客户端已无法读取数据!");// 退出当前客户端ClientHandler.this.exitBySelf();break;}// 打印到屏幕System.out.println(str);} while (!done);} catch (Exception e) {if (!done) {System.out.println("连接异常断开");ClientHandler.this.exitBySelf();}} finally {// 连接关闭CloseUtils.close(inputStream);}}void exit() {done = true;CloseUtils.close(inputStream);}}/*** 这里为什么用单线程池,而不是使用一个线程呢?* 因为涉及到 可能需要等 其他的线程把数据传来,然后才能发送, 线程之间 由 合作的关系,单个线程不方便管理,所以用线程池来协调*/class ClientWriteHandler {private boolean done = false;private final PrintStream printStream;private final ExecutorService executorService;ClientWriteHandler(OutputStream outputStream) {this.printStream = new PrintStream(outputStream);this.executorService = Executors.newSingleThreadExecutor();}void exit() {done = true;CloseUtils.close(printStream);executorService.shutdownNow();}void send(String str) {executorService.execute(new WriteRunnable(str));}class WriteRunnable implements Runnable {private final String msg;WriteRunnable(String msg) {this.msg = msg;}@Overridepublic void run() {if (ClientWriteHandler.this.done) {return;}try {ClientWriteHandler.this.printStream.println(msg);} catch (Exception e) {e.printStackTrace();}}}}
}

CloseUtil

关闭工具类, 用来减少重复的代码

public class CloseUtils {public static void close(Closeable... closeables) {if (closeables == null) {return;}for (Closeable closeable : closeables) {try {closeable.close();} catch (IOException e) {e.printStackTrace();}}}
}

NIO 聊天室基础版

code逻辑

原本的聊天室 server端业务逻辑:

1 一个监听端口线程,accept连接
2 对每个连接创建一个clienthandler 线程 去进行 客户端 读写操作;
3 对于消息的转发, 需要所有的当前在线客户端信息,因此,必须得主线程来做。还有一些启动 关闭操作也得主线程。 转发也是使用了单线程池

ClientHandler 中:
1 读: 从客户端读数据 ClientReadHandler extends Thread, 为什么这里也要开新的线程? 因为 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); 流是阻塞的, 如果不用新的线程,可能会影响后面信息的读取;
读到之后, 要通知 主线程, 让主线程对所有在线的客户端进行转发clientHandlerCallback.onNewMessageArrived
2 写: 这里用了一个 单线程池。

NIO改造之后的逻辑

注意这里只是改成了使用NIO, 线程还是很多;
TCP Server:

TCP Server:

1 private Selector selector;
private ServerSocketChannel server;
两个全局变量

2 start():

try {//是选择,监听还是得自己写selector = Selector.open();//相当于连接 和 流打开ServerSocketChannel server = ServerSocketChannel.open();// 设置为非阻塞server.configureBlocking(false);// 绑定本地端口server.socket().bind(new InetSocketAddress(port));// 注册客户端连接到达监听,这里只是注册事件,所以可以写在监听前面,真正的接收某个连接,在监听里面server.register(selector, SelectionKey.OP_ACCEPT);this.server = server;System.out.println("服务器信息:" + server.getLocalAddress().toString());// 启动客户端监听/***** 3333333333333333333*/ClientListener listener = this.listener = new ClientListener();listener.start();} catch (IOException e) {e.printStackTrace();return false;}return true;}

ClientListener

/**** 33333333333333*/private class ClientListener extends Thread {private boolean done = false;@Overridepublic void run() {super.run();Selector selector = TCPServer.this.selector;System.out.println("服务器准备就绪~");// 等待客户端连接,必须是个循环不断的过程do {// 得到客户端try {//本来是acceptif (selector.select() == 0) {if (done) {break;}continue;}//获取当前就绪的事件用selector.selectedKeys(), 然后再放进了迭代器Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {if (done) {break;}//已经使用了,要及时清除掉SelectionKey key = iterator.next();iterator.remove();// 检查当前Key的状态是否是我们关注的// 客户端到达状态if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();// 非阻塞状态拿到客户端连接//这里的 accept 为什么不阻塞? 因为 从selector那里 拿到的key是Acceptable()的SocketChannel socketChannel = serverSocketChannel.accept();try {// 客户端构建异步线程???/*** 这不还是一个客户端一个线程吗*/ClientHandler clientHandler = new ClientHandler(socketChannel, TCPServer.this);// 读取数据并打印clientHandler.readToPrint();// 添加同步处理synchronized (TCPServer.this) {clientHandlerList.add(clientHandler);}} catch (IOException e) {e.printStackTrace();System.out.println("客户端连接异常:" + e.getMessage());}}}} catch (IOException e) {e.printStackTrace();}} while (!done);System.out.println("服务器已关闭!");}void exit() {done = true;// 唤醒当前的阻塞selector.wakeup();}}

ClientHandler

public class ClientHandler {private final SocketChannel socketChannel;private final ClientReadHandler readHandler;private final ClientWriteHandler writeHandler;private final ClientHandlerCallback clientHandlerCallback;private final String clientInfo;/*** 1111111111* socket 变 SocketChannel, 注册读写事件* @param socketChannel* @param clientHandlerCallback* @throws IOException*/public ClientHandler(SocketChannel socketChannel, ClientHandlerCallback clientHandlerCallback) throws IOException {this.socketChannel = socketChannel;// 设置非阻塞模式socketChannel.configureBlocking(false);//读写事件//selector 怎么那么多啊, 一个处理读一个处理写Selector readSelector = Selector.open();socketChannel.register(readSelector, SelectionKey.OP_READ);this.readHandler = new ClientReadHandler(readSelector);Selector writeSelector = Selector.open();socketChannel.register(writeSelector, SelectionKey.OP_WRITE);this.writeHandler = new ClientWriteHandler(writeSelector);this.clientHandlerCallback = clientHandlerCallback;this.clientInfo = socketChannel.getRemoteAddress().toString();System.out.println("新客户端连接:" + clientInfo);}public String getClientInfo() {return clientInfo;}public void exit() {readHandler.exit();writeHandler.exit();CloseUtils.close(socketChannel);System.out.println("客户端已退出:" + clientInfo);}public void send(String str) {writeHandler.send(str);}public void readToPrint() {readHandler.start();}private void exitBySelf() {exit();clientHandlerCallback.onSelfClosed(this);}public interface ClientHandlerCallback {// 自身关闭通知void onSelfClosed(ClientHandler handler);// 收到消息时通知void onNewMessageArrived(ClientHandler handler, String msg);}/*** 2222222222222222* 改造读*/class ClientReadHandler extends Thread {private boolean done = false;private final Selector selector;//通道输出得用这个private final ByteBuffer byteBuffer;ClientReadHandler(Selector selector) {this.selector = selector;this.byteBuffer = ByteBuffer.allocate(256);}@Overridepublic void run() {super.run();try {//读操作,怎么拿到数据,从selectionKey中do {// 客户端拿到一条数据if (selector.select() == 0) {if (done) {break;}continue;}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {if (done) {break;}SelectionKey key = iterator.next();iterator.remove();//读到了一个,拿到当前的连接channel//这些处理都差不多, 只不过这里判断的是 是否可读if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();// 清空操作byteBuffer.clear();// 读取int read = client.read(byteBuffer);if (read > 0) {// 丢弃换行符String str = new String(byteBuffer.array(), 0, read - 1);// 通知到TCPServerclientHandlerCallback.onNewMessageArrived(ClientHandler.this, str);} else {System.out.println("客户端已无法读取数据!");// 退出当前客户端ClientHandler.this.exitBySelf();break;}}}} while (!done);} catch (Exception e) {if (!done) {System.out.println("连接异常断开");ClientHandler.this.exitBySelf();}} finally {// 连接关闭CloseUtils.close(selector);}}/***  只关 selector???*/void exit() {done = true;selector.wakeup();CloseUtils.close(selector);}}/*** 3 3333333333333333*/class ClientWriteHandler {private boolean done = false;private final Selector selector;private final ByteBuffer byteBuffer;private final ExecutorService executorService;ClientWriteHandler(Selector selector) {this.selector = selector;this.byteBuffer = ByteBuffer.allocate(256);this.executorService = Executors.newSingleThreadExecutor();}void exit() {done = true;CloseUtils.close(selector);executorService.shutdownNow();}void send(String str) {if (done) {return;}executorService.execute(new WriteRunnable(str));}/*** 444444444444444444*/class WriteRunnable implements Runnable {private final String msg;WriteRunnable(String msg) {this.msg = msg + '\n';}@Overridepublic void run() {if (ClientWriteHandler.this.done) {return;}byteBuffer.clear();byteBuffer.put(msg.getBytes());// 反转操作, 重点,把指针换到前面,因为put之后指针在最后,而发送数据是从指针位置开始发送//反转后再发送,才不是发送的空数据//翻转后, 指针才会指向初始位置, 结束指针才会指向结束位置byteBuffer.flip();while (!done && byteBuffer.hasRemaining()) {try {int len = socketChannel.write(byteBuffer);// len = 0 合法,因为是非阻塞io ,而又直接发送的,所以不一定有数据要发送;只有负数才是异常的。if (len < 0) {System.out.println("客户端已无法发送数据!");//自己退出ClientHandler.this.exitBySelf();break;}} catch (Exception e) {e.printStackTrace();}}}}}
}

4 进一步的线程优化

上面的例子中, selector一个线程, 每个channel对应两个线程,一个读一个写; 现在针对这部分优化;
但是 为什么 clientHandler已经拿到了socketChannel, 为什么还有 读写各自一个selector;

不是直接使用一个线程,这样容易造成拥塞(因为所有的任务都在这一个线程串行执行), 而是 监听与数据处理线程分离;, 监听一个线程, 用来处理accept, 所有的socketChannel对应的数据的输入输出 一个线程(或者一个线程池)。
实际上也不需要给所有的客户端分配线程,因为网络带宽是有限的, 一旦带宽被占据了, 数据也是没办法发送接收的。

类库

IoSelectorProvider

主要负责 读写在 selector的注册:
共有变量和构造函数

private final AtomicBoolean isClosed = new AtomicBoolean(false);/*** 555555555555555555* readSelector的锁是否处于某个过程*/private final AtomicBoolean inRegInput = new AtomicBoolean(false);private final AtomicBoolean inRegOutput = new AtomicBoolean(false);private final Selector readSelector;private final Selector writeSelector;private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();private final ExecutorService inputHandlePool;private final ExecutorService outputHandlePool;public IoSelectorProvider() throws IOException {readSelector = Selector.open();writeSelector = Selector.open();inputHandlePool = Executors.newFixedThreadPool(4,new IoProviderThreadFactory("IoProvider-Input-Thread-"));outputHandlePool = Executors.newFixedThreadPool(4,new IoProviderThreadFactory("IoProvider-Output-Thread-"));// 开始输出输入的监听startRead();startWrite();}
/*** 111111111111111111* 开始 读*/private void startRead() {Thread thread = new Thread("Clink IoSelectorProvider ReadSelector Thread") {@Overridepublic void run() {while (!isClosed.get()) {try {// 这里有对readSelector的读操作if (readSelector.select() == 0) {waitSelection(inRegInput);continue;}/**为什么一定要有取消注册?因为 这里 可以看到对channel的读是 用一个线程池来处理的,可以理解为立即就有结果,也就是数组可以很快遍历,没有阻塞那么如果存在某个 channel 没有完成读操作, 那么下一次循环readSelector.select() == 0 仍然不是0, 也就是没完成的话这个channel的读,会仍然注册在selector 上, 在这次的循环中,仍然会被加入 线程池。。如果有的任务执行缓慢, 那么会在线程池大量堆积。。这当然是要避免的。,所以对于,已经提交到线程池的,要及时取消注册。*/Set<SelectionKey> selectionKeys = readSelector.selectedKeys();for (SelectionKey selectionKey : selectionKeys) {if (selectionKey.isValid()) {//处理 当前 事件的 ,需要声明 当前 究竟是读事件,还是写事件//将 执行结果的 返回 存到map中// 异步 ,用的线程池handleSelection(selectionKey, SelectionKey.OP_READ, inputCallbackMap, inputHandlePool);}}selectionKeys.clear();} catch (IOException e) {e.printStackTrace();}}}};thread.setPriority(Thread.MAX_PRIORITY);thread.start();}
 /*** 22222222222222222222222222* 如何 处理 所有的 注册到 selector上的事件*/private static void handleSelection(SelectionKey key, int keyOps,HashMap<SelectionKey, Runnable> map,ExecutorService pool) {// 重点,!!!!!!!!!!!!!!!!!!!!!// 为什么要先取消注册??? 看1的注释// 取消继续对keyOps的监听//selector只有注册操作, 看3,没有取消注册,所以这里要自己实现//各种readyOps(),其实都是int值, 注册事件其实就是叠加ops的int值,要取消事件, 就把值去掉key.readyOps() & ~keyOpskey.interestOps(key.readyOps() & ~keyOps);Runnable runnable = null;//拿到当前 key 对应的runnable,然后加入线程池进行异步调度try {runnable = map.get(key);} catch (Exception ignored) {}if (runnable != null && !pool.isShutdown()) {// 异步调度pool.execute(runnable);}}
 /*** 3333333333333333333*/@Overridepublic boolean registerInput(SocketChannel channel, HandleInputCallback callback) {// 注册了 channel, 则 当前 channel只要可读,就要进行回调。/*这里必须有取消注册的过程, 原因可以看1 的注释*/return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,inputCallbackMap, callback) != null;}
/*** 444444444444444444444* 1 有对readSelector 的读操作,* 这里也有,这里的是 添加内容, 相当于写, 所以要加锁, 锁住readSelector,或者锁住操作readSelector的代码* 用原子布尔值当锁, 见5**/private static SelectionKey registerSelection(SocketChannel channel, Selector selector,int registerOps, AtomicBoolean locker,HashMap<SelectionKey, Runnable> map,Runnable runnable) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {// 设置锁定状态locker.set(true);try {// 唤醒当前的selector,让selector不处于select()状态//先唤醒,避免注册无效, 可以说是二次select?????不懂selector.wakeup();SelectionKey key = null;if (channel.isRegistered()) {// 查询是否已经注册过key = channel.keyFor(selector);if (key != null) {key.interestOps(key.readyOps() | registerOps);}}if (key == null) {// 注册selector得到Keykey = channel.register(selector, registerOps);// 注册回调map.put(key, runnable);}return key;} catch (ClosedChannelException e) {return null;} finally {// 解除锁定状态locker.set(false);try {// 通知locker.notify();} catch (Exception ignored) {}}}}

BIO/NIO聊天室相关推荐

  1. java NIO及NIO聊天室

    参考链接: java NIO实例1:http://blog.chinaunix.net/uid-25808509-id-3346228.html java NIO教程之selector(一系列): h ...

  2. JavaFX+NIO聊天室第四篇表情包

    设计思路 表情包是我们聊天中经常使用的功能,他的实现有两种办法.一种是使用Unicode中编码的700多种的表情字符.另外一种是使用图片来充当表情包. 1.从2010年开始,unicode对emoji ...

  3. 【Netty】NIO 网络编程 聊天室案例

    文章目录 一. NIO 聊天室需求 二. NIO 聊天室 服务器端 代码分析 三. NIO 聊天室 客户端 代码分析 四. NIO 聊天室 服务器端 完整代码 五. NIO 聊天室 客户端 完整代码 ...

  4. NIO及多线程实现聊天室完整版

    一 NIO原理 NIO与BIO虽然都叫IO,但实现的方式完全不同: IO面向流Stream–建立管道(InputStream/OutputStream)直接面对字节的数据流传输数据是单向的 NIO面向 ...

  5. Java-NIO实战多人聊天室

    NIO服务端 public class NioServer {/*** 启动*/public void start() throws IOException {/*** 1. 创建Selector*/ ...

  6. Java BIO多人聊天室

    基于上篇NIO的多人聊天室,这篇将用BIO也实现一遍 首先是服务端的设计: /*** @author Jing* @create 2020/5/17*/ public class ChatServer ...

  7. JAVA网络编程NIO实现简易多人聊天室

    BIO模型 BIO即blocking IO,顾名思义是一种阻塞模型.当没有客户端连接时,服务端会一直阻塞,当有客户端新建连接时,服务端会新开一个线程去响应(不用多线程的话服务端同一时刻最多只能接收一个 ...

  8. NIO网络编程实战之简单多人聊天室

    NIO网络编程实战 利用NIO编程知识,实现多人聊天室. 1. NIO编程实现步骤 第一步:创建Selector 第二步:创建ServerSocketChannel,并绑定监听端口 第三步:将Chan ...

  9. Java NIO示例:多人网络聊天室完整代码

    服务端:  package cn.zhangxueliang.herostory.chatroom;import java.io.IOException; import java.net.InetSo ...

最新文章

  1. 编程开发使用的软件大全
  2. rsync+inotify实现实时同步案例
  3. leetcode 137. 只出现一次的数字 II(位运算)
  4. Sublime Text 3插件安装方法
  5. python以列表的形式输出_简单介绍python输出列表元素的所有排列形式
  6. python中yield的使用_python中yield的用法详解-转载
  7. textfield获取其中内容_用户认知视角下的产品信息获取体验度量体系研究
  8. PHP中的Traits用法详解
  9. 153. php 引用
  10. bili弹幕姬_B站弹幕姬插件——弹幕日志
  11. 利用AWVS进行反制
  12. Tempo数据分析平台,助力企业高效完成数据预处理工作
  13. 《畅玩NAS》第8章 ZeroTier组建局域网
  14. 后代选择器和子代选择器
  15. 【总结】1438- 你想知道的前后端协作规范都在这了
  16. 【 IntelliJ IDEA 】设置主题和字体
  17. “入门大数据分析:探索海量数据的奥秘“
  18. Word中使用交叉引用插入多个参考文献
  19. 摘抄-对最好程序员的感想
  20. CP2102 USB转串口电路设计以及介绍

热门文章

  1. 青岛大学-王卓 数据结构与算法基础
  2. 《时间的朋友2015》
  3. 高新技术企业认定中研发费用有哪些
  4. 同一单元格的日期和时间分离
  5. android 3g内存,鲁大师核心算法再曝光 安卓3G内存比苹果1G慢?
  6. 感受DataGrid给数据操作带来的便利(4)
  7. 佳能85mmF#1.4L IS USM镜头构造
  8. python模拟足球射门_[转载]博客园仿真足球竞赛平台Python版SDK
  9. project展开和折叠任务
  10. HT1621B段码 LCD屏驱动 51单片机驱动程序