You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
192 lines
4.4 KiB
192 lines
4.4 KiB
using System; |
|
using System.Collections.Generic; |
|
using System.IO; |
|
using System.Linq; |
|
using System.Net; |
|
using System.Net.Sockets; |
|
|
|
namespace ET |
|
{ |
|
public sealed class TService : AService |
|
{ |
|
private readonly Dictionary<long, TChannel> idChannels = new Dictionary<long, TChannel>(); |
|
|
|
private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs(); |
|
|
|
private Socket acceptor; |
|
|
|
public HashSet<long> NeedStartSend = new HashSet<long>(); |
|
|
|
public TService(ThreadSynchronizationContext threadSynchronizationContext, ServiceType serviceType) |
|
{ |
|
this.foreachAction = channelId => |
|
{ |
|
TChannel tChannel = this.Get(channelId); |
|
tChannel?.Update(); |
|
}; |
|
this.ServiceType = serviceType; |
|
this.ThreadSynchronizationContext = threadSynchronizationContext; |
|
} |
|
|
|
public TService(ThreadSynchronizationContext threadSynchronizationContext, IPEndPoint ipEndPoint, ServiceType serviceType) |
|
{ |
|
this.foreachAction = channelId => |
|
{ |
|
TChannel tChannel = this.Get(channelId); |
|
tChannel?.Update(); |
|
}; |
|
|
|
this.ServiceType = serviceType; |
|
this.ThreadSynchronizationContext = threadSynchronizationContext; |
|
|
|
this.acceptor = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); |
|
this.acceptor.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); |
|
this.innArgs.Completed += this.OnComplete; |
|
this.acceptor.Bind(ipEndPoint); |
|
this.acceptor.Listen(1000); |
|
|
|
this.ThreadSynchronizationContext.PostNext(this.AcceptAsync); |
|
} |
|
|
|
private void OnComplete(object sender, SocketAsyncEventArgs e) |
|
{ |
|
switch (e.LastOperation) |
|
{ |
|
case SocketAsyncOperation.Accept: |
|
SocketError socketError = e.SocketError; |
|
Socket acceptSocket = e.AcceptSocket; |
|
this.ThreadSynchronizationContext.Post(()=>{this.OnAcceptComplete(socketError, acceptSocket);}); |
|
break; |
|
default: |
|
throw new Exception($"socket error: {e.LastOperation}"); |
|
} |
|
} |
|
|
|
#region 网络线程 |
|
|
|
private void OnAcceptComplete(SocketError socketError, Socket acceptSocket) |
|
{ |
|
if (this.acceptor == null) |
|
{ |
|
return; |
|
} |
|
|
|
if (socketError != SocketError.Success) |
|
{ |
|
Log.Error($"accept error {socketError}"); |
|
return; |
|
} |
|
|
|
try |
|
{ |
|
long id = this.CreateAcceptChannelId(0); |
|
TChannel channel = new TChannel(id, acceptSocket, this); |
|
this.idChannels.Add(channel.Id, channel); |
|
long channelId = channel.Id; |
|
|
|
this.OnAccept(channelId, channel.RemoteAddress); |
|
} |
|
catch (Exception exception) |
|
{ |
|
Log.Error(exception); |
|
} |
|
|
|
// 开始新的accept |
|
this.AcceptAsync(); |
|
} |
|
|
|
|
|
|
|
private void AcceptAsync() |
|
{ |
|
this.innArgs.AcceptSocket = null; |
|
if (this.acceptor.AcceptAsync(this.innArgs)) |
|
{ |
|
return; |
|
} |
|
OnAcceptComplete(this.innArgs.SocketError, this.innArgs.AcceptSocket); |
|
} |
|
|
|
private TChannel Create(IPEndPoint ipEndPoint, long id) |
|
{ |
|
TChannel channel = new TChannel(id, ipEndPoint, this); |
|
this.idChannels.Add(channel.Id, channel); |
|
return channel; |
|
} |
|
|
|
protected override void Get(long id, IPEndPoint address) |
|
{ |
|
if (this.idChannels.TryGetValue(id, out TChannel _)) |
|
{ |
|
return; |
|
} |
|
this.Create(address, id); |
|
} |
|
|
|
private TChannel Get(long id) |
|
{ |
|
TChannel channel = null; |
|
this.idChannels.TryGetValue(id, out channel); |
|
return channel; |
|
} |
|
|
|
public override void Dispose() |
|
{ |
|
this.acceptor?.Close(); |
|
this.acceptor = null; |
|
this.innArgs.Dispose(); |
|
ThreadSynchronizationContext = null; |
|
|
|
foreach (long id in this.idChannels.Keys.ToArray()) |
|
{ |
|
TChannel channel = this.idChannels[id]; |
|
channel.Dispose(); |
|
} |
|
this.idChannels.Clear(); |
|
} |
|
|
|
public override void Remove(long id) |
|
{ |
|
if (this.idChannels.TryGetValue(id, out TChannel channel)) |
|
{ |
|
channel.Dispose(); |
|
} |
|
|
|
this.idChannels.Remove(id); |
|
} |
|
|
|
protected override void Send(long channelId, long actorId, MemoryStream stream) |
|
{ |
|
try |
|
{ |
|
TChannel aChannel = this.Get(channelId); |
|
if (aChannel == null) |
|
{ |
|
this.OnError(channelId, ErrorCore.ERR_SendMessageNotFoundTChannel); |
|
return; |
|
} |
|
aChannel.Send(actorId, stream); |
|
} |
|
catch (Exception e) |
|
{ |
|
Log.Error(e); |
|
} |
|
} |
|
|
|
private readonly Action<long> foreachAction; |
|
|
|
public override void Update() |
|
{ |
|
this.NeedStartSend.Foreach(this.foreachAction); |
|
this.NeedStartSend.Clear(); |
|
} |
|
|
|
public override bool IsDispose() |
|
{ |
|
return this.ThreadSynchronizationContext == null; |
|
} |
|
|
|
#endregion |
|
|
|
} |
|
} |