1.类

(1)socket IO操作内存管理类 BufferManager

    // This class creates a single large buffer which can be divided up // and assigned to SocketAsyncEventArgs objects for use with each // socket I/O operation.  // This enables bufffers to be easily reused and guards against // fragmenting heap memory.// // The operations exposed on the BufferManager class are not thread safe.public class BufferManager{//buffer缓冲区大小private int m_numBytes;//缓冲区private byte[] m_buffer;private Stack<int> m_freeIndexPool;private int m_currentIndex;private int m_bufferSize;public BufferManager(int totalBytes, int bufferSize){m_numBytes = totalBytes;m_currentIndex = 0;m_bufferSize = bufferSize;m_freeIndexPool = new Stack<int>();}/// <summary>/// 给buffer分配缓冲区/// </summary>public void InitBuffer(){m_buffer = new byte[m_numBytes];}/// <summary>///  将buffer添加到args的IO缓冲区中,并设置offset/// </summary>/// <param name="args"></param>/// <returns></returns>public bool SetBuffer(SocketAsyncEventArgs args){if (m_freeIndexPool.Count > 0){args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);}else{if ((m_numBytes - m_bufferSize) < m_currentIndex){return false;}args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);m_currentIndex += m_bufferSize;}return true;}/// <summary>/// 将buffer从args的IO缓冲区中释放/// </summary>/// <param name="args"></param>public void FreeBuffer(SocketAsyncEventArgs args){m_freeIndexPool.Push(args.Offset);args.SetBuffer(null, 0, 0);}/// <summary>/// 释放全部buffer缓存/// </summary>public void FreeAllBuffer(){m_freeIndexPool.Clear();m_currentIndex = 0;m_buffer = null;}}

  

(2)SocketAsyncEventArgsPool

    // Represents a collection of reusable SocketAsyncEventArgs objects.  public class SocketAsyncEventArgsPool{private Stack<SocketAsyncEventArgs> m_pool;// Initializes the object pool to the specified size//// The "capacity" parameter is the maximum number of // SocketAsyncEventArgs objects the pool can holdpublic SocketAsyncEventArgsPool(int capacity){m_pool = new Stack<SocketAsyncEventArgs>(capacity);}// Add a SocketAsyncEventArg instance to the pool////The "item" parameter is the SocketAsyncEventArgs instance // to add to the poolpublic void Push(SocketAsyncEventArgs item){if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }lock (m_pool){m_pool.Push(item);}}// Removes a SocketAsyncEventArgs instance from the pool// and returns the object removed from the poolpublic SocketAsyncEventArgs Pop(){lock (m_pool){return m_pool.Pop();}}/// <summary>/// 清空栈中元素/// </summary>public void Clear(){lock (m_pool){m_pool.Clear();}}// The number of SocketAsyncEventArgs instances in the poolpublic int Count{get { return m_pool.Count; }}}

  

(3)AsyncUserToken

    public class AsyncUserToken{private Socket socket = null;public Socket Socket { get => socket; set => socket = value; }}

  

(4)服务器端操作类TcpServiceSocketAsync

    public class TcpServiceSocketAsync{//接收数据事件public Action<string> recvMessageEvent = null;//发送结果事件public Action<int> sendResultEvent = null;//监听socketprivate Socket listenSocket = null;//允许连接到tcp服务器的tcp客户端数量private int numConnections = 0;//用于socket发送和接受的缓存区大小private int bufferSize = 0;//socket缓冲区管理对象private BufferManager bufferManager = null;//SocketAsyncEventArgs池private SocketAsyncEventArgsPool socketAsyncEventArgsPool = null;//当前连接的tcp客户端数量private int numberAcceptedClients = 0;//控制tcp客户端连接数量的信号量private Semaphore maxNumberAcceptedClients = null;//用于socket发送数据的SocketAsyncEventArgs集合private List<SocketAsyncEventArgs> sendAsyncEventArgs = null;//tcp服务器ipprivate string ip = "";//tcp服务器端口private int port = 0;/// <summary>/// 构造函数/// </summary>/// <param name="numConnections">允许连接到tcp服务器的tcp客户端数量</param>/// <param name="bufferSize">用于socket发送和接受的缓存区大小</param>public TcpServiceSocketAsync(int numConnections, int bufferSize){if (numConnections <= 0 || numConnections > int.MaxValue)throw new ArgumentOutOfRangeException("_numConnections is out of range");if (bufferSize <= 0 || bufferSize > int.MaxValue)throw new ArgumentOutOfRangeException("_receiveBufferSize is out of range");this.numConnections = numConnections;this.bufferSize = bufferSize;bufferManager = new BufferManager(numConnections * bufferSize * 2, bufferSize);socketAsyncEventArgsPool = new SocketAsyncEventArgsPool(numConnections);maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);sendAsyncEventArgs = new List<SocketAsyncEventArgs>();}/// <summary>/// 初始化数据(bufferManager,socketAsyncEventArgsPool)/// </summary>public void Init(){numberAcceptedClients = 0;bufferManager.InitBuffer();SocketAsyncEventArgs readWriteEventArg;for (int i = 0; i < numConnections * 2; i++){readWriteEventArg = new SocketAsyncEventArgs();readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);readWriteEventArg.UserToken = new AsyncUserToken();bufferManager.SetBuffer(readWriteEventArg);socketAsyncEventArgsPool.Push(readWriteEventArg);}}/// <summary>///  开启tcp服务器,等待tcp客户端连接/// </summary>/// <param name="ip"></param>/// <param name="port"></param>public void Start(string ip, int port){if (string.IsNullOrEmpty(ip))throw new ArgumentNullException("ip cannot be null");if (port < 1 || port > 65535)throw new ArgumentOutOfRangeException("port is out of range");this.ip = ip;this.port = port;try{listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);IPAddress address = IPAddress.Parse(ip);IPEndPoint endpoint = new IPEndPoint(address, port);listenSocket.Bind(endpoint);//绑定地址listenSocket.Listen(int.MaxValue);//开始监听StartAccept(null);}catch (Exception ex){throw ex;}}/// <summary>/// 关闭tcp服务器/// </summary>public void CloseSocket(){if (listenSocket == null)return;try{foreach (var e in sendAsyncEventArgs){((AsyncUserToken)e.UserToken).Socket.Shutdown(SocketShutdown.Both);}listenSocket.Shutdown(SocketShutdown.Both);}catch { }try{foreach (var e in sendAsyncEventArgs){((AsyncUserToken)e.UserToken).Socket.Close();}listenSocket.Close();}catch { }try{foreach (var e in sendAsyncEventArgs){e.Dispose();}}catch { }sendAsyncEventArgs.Clear();socketAsyncEventArgsPool.Clear();bufferManager.FreeAllBuffer();maxNumberAcceptedClients.Release(numberAcceptedClients);}/// <summary>/// 重新启动tcp服务器/// </summary>public void Restart(){CloseSocket();Init();Start(ip, port);}/// <summary>/// 开始等待tcp客户端连接/// </summary>/// <param name="acceptEventArg"></param>private void StartAccept(SocketAsyncEventArgs acceptEventArg){if (acceptEventArg == null){acceptEventArg = new SocketAsyncEventArgs();acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);}else{// socket must be cleared since the context object is being reusedacceptEventArg.AcceptSocket = null;}maxNumberAcceptedClients.WaitOne();bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);if (!willRaiseEvent){ProcessAccept(acceptEventArg);}}/// <summary>/// Socket.AcceptAsync完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e){ProcessAccept(e);}/// <summary>/// 接受到tcp客户端连接,进行处理/// </summary>/// <param name="e"></param>private void ProcessAccept(SocketAsyncEventArgs e){Interlocked.Increment(ref numberAcceptedClients);//设置用于接收的SocketAsyncEventArgs的socket,可以接受数据SocketAsyncEventArgs recvEventArgs = socketAsyncEventArgsPool.Pop();((AsyncUserToken)recvEventArgs.UserToken).Socket = e.AcceptSocket;//设置用于发送的SocketAsyncEventArgs的socket,可以发送数据SocketAsyncEventArgs sendEventArgs = socketAsyncEventArgsPool.Pop();((AsyncUserToken)sendEventArgs.UserToken).Socket = e.AcceptSocket;sendAsyncEventArgs.Add(sendEventArgs);StartRecv(recvEventArgs);StartAccept(e);}/// <summary>/// tcp服务器开始接受tcp客户端发送的数据/// </summary>private void StartRecv(SocketAsyncEventArgs e){bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.ReceiveAsync(e);if (!willRaiseEvent){ProcessReceive(e);}}/// <summary>/// socket.sendAsync和socket.recvAsync的完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void IO_Completed(object sender, SocketAsyncEventArgs e){switch (e.LastOperation){case SocketAsyncOperation.Receive:ProcessReceive(e);break;case SocketAsyncOperation.Send:ProcessSend(e);break;default:throw new ArgumentException("The last operation completed on the socket was not a receive or send");}}/// <summary>/// 处理接受到的tcp客户端数据/// </summary>/// <param name="e"></param>private void ProcessReceive(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success){if (recvMessageEvent != null)//一定要指定GetString的长度recvMessageEvent(Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred));StartRecv(e);}else{CloseClientSocket(e);}}/// <summary>/// 处理tcp服务器发送的结果/// </summary>/// <param name="e"></param>private void ProcessSend(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.SocketError == SocketError.Success){if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);}else{if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);CloseClientSocket(e);}}/// <summary>/// 关闭一个与tcp客户端连接的socket/// </summary>/// <param name="e"></param>private void CloseClientSocket(SocketAsyncEventArgs e){AsyncUserToken token = e.UserToken as AsyncUserToken;try{//关闭socket时,单独使用socket.close()通常会造成资源提前被释放,应该在关闭socket之前,先使用shutdown进行接受或者发送的禁用,再使用socket进行释放token.Socket.Shutdown(SocketShutdown.Both);}catch { }token.Socket.Close();Interlocked.Decrement(ref numberAcceptedClients);socketAsyncEventArgsPool.Push(e);maxNumberAcceptedClients.Release();if (e.LastOperation == SocketAsyncOperation.Send)sendAsyncEventArgs.Remove(e);}/// <summary>/// 给全部tcp客户端发送数据/// </summary>/// <param name="message"></param>public void SendMessageToAllClients(string message){if (string.IsNullOrEmpty(message))throw new ArgumentNullException("message cannot be null");foreach (var e in sendAsyncEventArgs){byte[] buff = Encoding.UTF8.GetBytes(message);if (buff.Length > bufferSize)throw new ArgumentOutOfRangeException("message is out off range");buff.CopyTo(e.Buffer, e.Offset);e.SetBuffer(e.Offset, buff.Length);bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.SendAsync(e);if (!willRaiseEvent){ProcessSend(e);}}}}

    

(5)客户端操作类TcpClientSocketAsync

    public class TcpClientSocketAsync{//接收数据事件public Action<string> recvMessageEvent = null;//发送结果事件public Action<int> sendResultEvent = null;//接受缓存数组private byte[] recvBuff = null;//发送缓存数组private byte[] sendBuff = null;//连接socketprivate Socket connectSocket = null;//用于发送数据的SocketAsyncEventArgsprivate SocketAsyncEventArgs sendEventArg = null;//用于接收数据的SocketAsyncEventArgsprivate SocketAsyncEventArgs recvEventArg = null;//tcp服务器ipprivate string ip = "";//tcp服务器端口private int port = 0;/// <summary>/// 构造函数/// </summary>/// <param name="bufferSize">用于socket发送和接受的缓存区大小</param>public TcpClientSocketAsync(int bufferSize){//设置用于发送数据的SocketAsyncEventArgssendBuff = new byte[bufferSize];sendEventArg = new SocketAsyncEventArgs();sendEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);sendEventArg.SetBuffer(sendBuff, 0, bufferSize);//设置用于接受数据的SocketAsyncEventArgsrecvBuff = new byte[bufferSize];recvEventArg = new SocketAsyncEventArgs();recvEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);recvEventArg.SetBuffer(recvBuff, 0, bufferSize);}/// <summary>/// 开启tcp客户端,连接tcp服务器/// </summary>/// <param name="ip"></param>/// <param name="port"></param>public void Start(string ip, int port){if (string.IsNullOrEmpty(ip))throw new ArgumentNullException("ip cannot be null");if (port < 1 || port > 65535)throw new ArgumentOutOfRangeException("port is out of range");this.ip = ip;this.port = port;try{connectSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);IPAddress address = IPAddress.Parse(ip);IPEndPoint endpoint = new IPEndPoint(address, port);//连接tcp服务器SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();connectEventArg.Completed += ConnectEventArgs_Completed;connectEventArg.RemoteEndPoint = endpoint;//设置要连接的tcp服务器地址bool willRaiseEvent = connectSocket.ConnectAsync(connectEventArg);if (!willRaiseEvent)ProcessConnect(connectEventArg);}catch (Exception ex){throw ex;}}/// <summary>/// 发送数据到tcp服务器/// </summary>/// <param name="message">要发送的数据</param>public void SendMessage(string message){if (string.IsNullOrEmpty(message))throw new ArgumentNullException("message cannot be null");if (connectSocket == null)throw new Exception("socket cannot be null");byte[] buff = Encoding.UTF8.GetBytes(message);buff.CopyTo(sendBuff, 0);sendEventArg.SetBuffer(0, buff.Length);bool willRaiseEvent = connectSocket.SendAsync(sendEventArg);if (!willRaiseEvent){ProcessSend(sendEventArg);}}/// <summary>/// 关闭tcp客户端/// </summary>public void CloseSocket(){if (connectSocket == null)return;try{//关闭socket时,单独使用socket.close()通常会造成资源提前被释放,应该在关闭socket之前,先使用shutdown进行接受或者发送的禁用,再使用socket进行释放connectSocket.Shutdown(SocketShutdown.Both);}catch { }try{connectSocket.Close();}catch { }}/// <summary>/// 重启tcp客户端,重新连接tcp服务器/// </summary>public void Restart(){CloseSocket();Start(ip, port);}/// <summary>/// Socket.ConnectAsync完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void ConnectEventArgs_Completed(object sender, SocketAsyncEventArgs e){ProcessConnect(e);}private void ProcessConnect(SocketAsyncEventArgs e){StartRecv();}/// <summary>/// tcp客户端开始接受tcp服务器发送的数据/// </summary>private void StartRecv(){bool willRaiseEvent = connectSocket.ReceiveAsync(recvEventArg);if (!willRaiseEvent){ProcessReceive(recvEventArg);}}/// <summary>/// socket.sendAsync和socket.recvAsync的完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void IO_Completed(object sender, SocketAsyncEventArgs e){switch (e.LastOperation){case SocketAsyncOperation.Receive:ProcessReceive(e);break;case SocketAsyncOperation.Send:ProcessSend(e);break;default:throw new ArgumentException("The last operation completed on the socket was not a receive or send");}}/// <summary>/// 处理接受到的tcp服务器数据/// </summary>/// <param name="e"></param>private void ProcessReceive(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success){if (recvMessageEvent != null)//一定要指定GetString的长度,否则整个bugger都要转成stringrecvMessageEvent(Encoding.UTF8.GetString(e.Buffer, 0, e.BytesTransferred));StartRecv();}else{Restart();}}/// <summary>/// 处理tcp客户端发送的结果/// </summary>/// <param name="e"></param>private void ProcessSend(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.SocketError == SocketError.Success){if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);}else{if (sendResultEvent != null)sendResultEvent(-1);Restart();}}}

   

2.使用

(1)服务器端

    public partial class Form1 : Form{TcpServiceSocketAsync tcpServiceSocket = null;private readonly string ip = "192.168.172.142";private readonly int port = 8090;public Form1(){InitializeComponent();tcpServiceSocket = new TcpServiceSocketAsync(10, 1024);tcpServiceSocket.recvMessageEvent += new Action<string>(Recv);tcpServiceSocket.Init();}private void Recv(string message){this.BeginInvoke(new Action(() =>{tbRecv.Text += message + "\r\n";}));}private void btnStart_Click(object sender, EventArgs e){tcpServiceSocket.Start(ip, port);}private void btnSend_Click(object sender, EventArgs e){string message = tbSend.Text.Trim();if (string.IsNullOrEmpty(message))return;tcpServiceSocket.SendMessageToAllClients(message);tbSend.Text = "";}}

  

(2)客户端

    public partial class Form1 : Form{private TcpClientSocketAsync tcpClientSocket = null;private readonly string ip = "192.168.172.142";private readonly int port = 8090;private readonly int buffSize = 1024;public Form1(){InitializeComponent();tcpClientSocket = new TcpClientSocketAsync(buffSize);tcpClientSocket.recvMessageEvent += new Action<string>(Recv);}private void Recv(string message){this.BeginInvoke(new Action(() =>{tbRecv.Text += message + "\r\n";}));}private void btnStart_Click(object sender, EventArgs e){tcpClientSocket.Start(ip, port);}private void btnSend_Click(object sender, EventArgs e){string message = tbSend.Text.Trim();if (string.IsNullOrEmpty(message))return;tcpClientSocket.SendMessage(message);tbSend.Text = "";}}

   

3.演示

参考:

https://docs.microsoft.com/zh-cn/dotnet/api/system.net.sockets.socketasynceventargs?view=netframework-4.8

转载于:https://www.cnblogs.com/yaosj/p/11170244.html

C#实现异步阻塞TCP(SocketAsyncEventArgs,SendAsync,ReceiveAsync,AcceptAsync,ConnectAsync)...相关推荐

  1. 一文读懂并发与并行,同步与异步阻塞

    并发与并行 并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法, 实现用多个任务"一起"执行(实际上总有一些任务不在执行,因为切换任务的速度相当快, 看上去一起执行而 ...

  2. 【gev】 Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

    gev 轻量.快速的 Golang 网络库 https://github.com/Allenxuxu/gev gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 ...

  3. golang mysql 非阻塞_Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库...

    gev 轻量.快速的 Golang 网络库 gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue ...

  4. Nginx讲解(一)Nginx介绍以及同步异步阻塞非阻塞介绍

    一.Nginx概念讲解 Nginx是一个高性能的HTTP和反向代理服务,也是一个IMAP/POP3/SMTP服务 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3) ...

  5. Java网络编程------IO模型的同步/异步/阻塞/非阻塞(1)

    IO模型的同步/异步/阻塞/非阻塞 一.同步/异步/阻塞/非阻塞 1.同步和异步 2.阻塞和非阻塞 3.同步.异步和阻塞.非阻塞组合 二.IO 1.I/O 2.阻塞IO和非阻塞IO 3.同步IO和同步 ...

  6. 老张喝茶 教你同步异步 阻塞与非阻塞(转)

    原文 老张爱喝茶,废话不说,煮开水. 出场人物:老张,水壶两把(普通水壶,简称水壶:会响的水壶,简称响水壶). 1 老张把水壶放到火上,立等水开.(同步阻塞) 老张觉得自己有点傻 2 老张把水壶放到火 ...

  7. pythontcp服务器如何关闭阻塞_python实现单线程多任务非阻塞TCP服务端

    本文实例为大家分享了python实现单线程多任务非阻塞TCP服务端的具体代码,供大家参考,具体内容如下 # coding:utf-8 from socket import * # 1.创建服务器soc ...

  8. 迄今为止把同步/异步/阻塞/非阻塞/BIO/NIO/AIO讲的这么清楚的好文章(快快珍藏)...

    常规的误区 假设有一个展示用户详情的需求,分两步,先调用一个HTTP接口拿到详情数据,然后使用适合的视图展示详情数据. 如果网速很慢,代码发起一个HTTP请求后,就卡住不动了,直到十几秒后才拿到HTT ...

  9. 同步 异步 阻塞 非阻塞概念区分

    老张爱喝茶,废话不说,煮开水. 提前剧透一下:同步和非同步主要用来形容被调用线程,阻塞非阻塞用来形容主线程的. 出场人物:老张(主线程),水壶(被调用线程)两把(普通水壶,简称水壶:会响的水壶,简称响 ...

最新文章

  1. 2022-2028年中国网络出版产业投资分析及前景预测报告
  2. halcon可以用python吗_如何基于pythonnet调用halcon脚本
  3. 经典C语言程序100例之五九
  4. SQL SERVER 2012修改数据库名称(包括 db.mdf 名称的修改)
  5. 2022年终结版WPF项目实战合集发布
  6. ASP.NET Core中使用表达式树创建URL
  7. 区块链以信用为基础,所以目前在中国不可行.
  8. java bigDecimal and double
  9. 民生证券手机网上开户流程
  10. matlab中的方波信号图片_哈工大、哈工程MATLAB被禁用,这个国产软件号称可替代!...
  11. 抖音一个老人和一个机器人歌曲_抖音M哥很火的歌曲有哪些
  12. ubuntu搜狗输入法切换快捷键fcitx设置
  13. freemarker模板动态生成word文档之配置模板路径
  14. 计算机动画现状范文网,计算机动画教程
  15. 关于上海四金计算和工资对照表(转载)
  16. MiniGUI 特性说明
  17. 2023年全国最新二级建造师精选真题及答案46
  18. 人工智能和中国国家人工智能发展战略
  19. vue 图片跨域问题解决
  20. 为什么要做数仓分层,不做行吗?

热门文章

  1. 使用windows自带diskpart转换磁盘格式
  2. 一个游戏程序员必须要读的书[转]
  3. 阿里天池——Numpy实战
  4. 阿菜的Vue学习之旅(一)
  5. 微信小程序-实现两个按钮固定在页面底端且不随页面滑动(静态页面)
  6. 3个5相乘列乘法算式_五年级数学上册:第一单元《小数乘法》测试分析,开学完美第一测...
  7. 两句话说明白 MAC和HMAC的区别
  8. 第四章:用Python对用户的评论数据进行情感倾向分析
  9. 比较传统数据与大数据
  10. FTDI BITBANG软件实现i2ctools