using System; using System.IO; using System.Net; using System.Net.Sockets; namespace ET { /// /// 封装Socket,将回调push到主线程处理 /// public sealed class TChannel: AChannel { private readonly TService Service; private Socket socket; private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs(); private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs(); private readonly CircularBuffer recvBuffer = new CircularBuffer(); private readonly CircularBuffer sendBuffer = new CircularBuffer(); private bool isSending; private bool isConnected; private readonly PacketParser parser; private readonly byte[] sendCache = new byte[Packet.OpcodeLength + Packet.ActorIdLength]; private void OnComplete(object sender, SocketAsyncEventArgs e) { switch (e.LastOperation) { case SocketAsyncOperation.Connect: this.Service.ThreadSynchronizationContext.Post(()=>OnConnectComplete(e)); break; case SocketAsyncOperation.Receive: this.Service.ThreadSynchronizationContext.Post(()=>OnRecvComplete(e)); break; case SocketAsyncOperation.Send: this.Service.ThreadSynchronizationContext.Post(()=>OnSendComplete(e)); break; case SocketAsyncOperation.Disconnect: this.Service.ThreadSynchronizationContext.Post(()=>OnDisconnectComplete(e)); break; default: throw new Exception($"socket error: {e.LastOperation}"); } } #region 网络线程 public TChannel(long id, IPEndPoint ipEndPoint, TService service) { this.ChannelType = ChannelType.Connect; this.Id = id; this.Service = service; this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); this.socket.NoDelay = true; this.parser = new PacketParser(this.recvBuffer, this.Service); this.innArgs.Completed += this.OnComplete; this.outArgs.Completed += this.OnComplete; this.RemoteAddress = ipEndPoint; this.isConnected = false; this.isSending = false; this.Service.ThreadSynchronizationContext.PostNext(this.ConnectAsync); } public TChannel(long id, Socket socket, TService service) { this.ChannelType = ChannelType.Accept; this.Id = id; this.Service = service; this.socket = socket; this.socket.NoDelay = true; this.parser = new PacketParser(this.recvBuffer, this.Service); this.innArgs.Completed += this.OnComplete; this.outArgs.Completed += this.OnComplete; this.RemoteAddress = (IPEndPoint)socket.RemoteEndPoint; this.isConnected = true; this.isSending = false; // 下一帧再开始读写 this.Service.ThreadSynchronizationContext.PostNext(() => { this.StartRecv(); this.StartSend(); }); } public override void Dispose() { if (this.IsDisposed) { return; } Log.Info($"channel dispose: {this.Id} {this.RemoteAddress}"); long id = this.Id; this.Id = 0; this.Service.Remove(id); this.socket.Close(); this.innArgs.Dispose(); this.outArgs.Dispose(); this.innArgs = null; this.outArgs = null; this.socket = null; } public void Send(long actorId, MemoryStream stream) { if (this.IsDisposed) { throw new Exception("TChannel已经被Dispose, 不能发送消息"); } switch (this.Service.ServiceType) { case ServiceType.Inner: { int messageSize = (int) (stream.Length - stream.Position); if (messageSize > ushort.MaxValue * 16) { throw new Exception($"send packet too large: {stream.Length} {stream.Position}"); } this.sendCache.WriteTo(0, messageSize); this.sendBuffer.Write(this.sendCache, 0, PacketParser.InnerPacketSizeLength); // actorId stream.GetBuffer().WriteTo(0, actorId); this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position)); break; } case ServiceType.Outer: { stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin); // 外网不需要actorId ushort messageSize = (ushort) (stream.Length - stream.Position); this.sendCache.WriteTo(0, messageSize); this.sendBuffer.Write(this.sendCache, 0, PacketParser.OuterPacketSizeLength); this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position)); break; } } if (!this.isSending) { //this.StartSend(); this.Service.NeedStartSend.Add(this.Id); } } private void ConnectAsync() { this.outArgs.RemoteEndPoint = this.RemoteAddress; if (this.socket.ConnectAsync(this.outArgs)) { return; } OnConnectComplete(this.outArgs); } private void OnConnectComplete(object o) { if (this.socket == null) { return; } SocketAsyncEventArgs e = (SocketAsyncEventArgs) o; if (e.SocketError != SocketError.Success) { this.OnError((int)e.SocketError); return; } e.RemoteEndPoint = null; this.isConnected = true; this.StartRecv(); this.StartSend(); } private void OnDisconnectComplete(object o) { SocketAsyncEventArgs e = (SocketAsyncEventArgs)o; this.OnError((int)e.SocketError); } private void StartRecv() { while (true) { try { if (this.socket == null) { return; } int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex; this.innArgs.SetBuffer(this.recvBuffer.Last, this.recvBuffer.LastIndex, size); } catch (Exception e) { Log.Error($"tchannel error: {this.Id}\n{e}"); this.OnError(ErrorCore.ERR_TChannelRecvError); return; } if (this.socket.ReceiveAsync(this.innArgs)) { return; } this.HandleRecv(this.innArgs); } } private void OnRecvComplete(object o) { this.HandleRecv(o); if (this.socket == null) { return; } this.StartRecv(); } private void HandleRecv(object o) { if (this.socket == null) { return; } SocketAsyncEventArgs e = (SocketAsyncEventArgs) o; if (e.SocketError != SocketError.Success) { this.OnError((int)e.SocketError); return; } if (e.BytesTransferred == 0) { this.OnError(ErrorCore.ERR_PeerDisconnect); return; } this.recvBuffer.LastIndex += e.BytesTransferred; if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize) { this.recvBuffer.AddLast(); this.recvBuffer.LastIndex = 0; } // 收到消息回调 while (true) { // 这里循环解析消息执行,有可能,执行消息的过程中断开了session if (this.socket == null) { return; } try { bool ret = this.parser.Parse(); if (!ret) { break; } this.OnRead(this.parser.MemoryStream); } catch (Exception ee) { Log.Error($"ip: {this.RemoteAddress} {ee}"); this.OnError(ErrorCore.ERR_SocketError); return; } } } public void Update() { this.StartSend(); } private void StartSend() { if(!this.isConnected) { return; } if (this.isSending) { return; } while (true) { try { if (this.socket == null) { this.isSending = false; return; } // 没有数据需要发送 if (this.sendBuffer.Length == 0) { this.isSending = false; return; } this.isSending = true; int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex; if (sendSize > this.sendBuffer.Length) { sendSize = (int)this.sendBuffer.Length; } this.outArgs.SetBuffer(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize); if (this.socket.SendAsync(this.outArgs)) { return; } HandleSend(this.outArgs); } catch (Exception e) { throw new Exception($"socket set buffer error: {this.sendBuffer.First.Length}, {this.sendBuffer.FirstIndex}", e); } } } private void OnSendComplete(object o) { HandleSend(o); this.isSending = false; this.StartSend(); } private void HandleSend(object o) { if (this.socket == null) { return; } SocketAsyncEventArgs e = (SocketAsyncEventArgs) o; if (e.SocketError != SocketError.Success) { this.OnError((int)e.SocketError); return; } if (e.BytesTransferred == 0) { this.OnError(ErrorCore.ERR_PeerDisconnect); return; } this.sendBuffer.FirstIndex += e.BytesTransferred; if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize) { this.sendBuffer.FirstIndex = 0; this.sendBuffer.RemoveFirst(); } } private void OnRead(MemoryStream memoryStream) { try { long channelId = this.Id; this.Service.OnRead(channelId, memoryStream); } catch (Exception e) { Log.Error($"{this.RemoteAddress} {memoryStream.Length} {e}"); // 出现任何消息解析异常都要断开Session,防止客户端伪造消息 this.OnError(ErrorCore.ERR_PacketParserError); } } private void OnError(int error) { Log.Info($"TChannel OnError: {error} {this.RemoteAddress}"); long channelId = this.Id; this.Service.Remove(channelId); this.Service.OnError(channelId, error); } #endregion } }