TcpRpcServer,StartAccepting

This commit is contained in:
Christian Köllner 2020-02-23 14:24:17 +01:00
parent 596a97a362
commit 409e517587
5 changed files with 67 additions and 19 deletions

View File

@ -7,7 +7,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.12.0" /> <PackageReference Include="BenchmarkDotNet" Version="0.12.0" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.32-g49c5e80436" /> <PackageReference Include="Capnp.Net.Runtime" Version="1.3.33-g596a97a362" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" /> <PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
<PackageReference Include="Google.Protobuf" Version="3.11.3" /> <PackageReference Include="Google.Protobuf" Version="3.11.3" />
<PackageReference Include="Grpc.Net.Client" Version="2.27.0" /> <PackageReference Include="Grpc.Net.Client" Version="2.27.0" />

View File

@ -13,6 +13,9 @@ namespace Benchmark
[Params(20, 200, 2000, 20000, 200000, 2000000)] [Params(20, 200, 2000, 20000, 200000, 2000000)]
public int PayloadBytes; public int PayloadBytes;
[Params(0, 256, 1024, 4096)]
public int BufferSize;
TcpRpcClient _client; TcpRpcClient _client;
IEchoer _echoer; IEchoer _echoer;
byte[] _payload; byte[] _payload;
@ -21,6 +24,8 @@ namespace Benchmark
public void Setup() public void Setup()
{ {
_client = new TcpRpcClient("localhost", 5002); _client = new TcpRpcClient("localhost", 5002);
if (BufferSize > 0)
_client.AddBuffering(BufferSize);
_client.WhenConnected.Wait(); _client.WhenConnected.Wait();
_echoer = _client.GetMain<IEchoer>(); _echoer = _client.GetMain<IEchoer>();
_payload = new byte[PayloadBytes]; _payload = new byte[PayloadBytes];

View File

@ -6,7 +6,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.32-g49c5e80436" /> <PackageReference Include="Capnp.Net.Runtime" Version="1.3.33-g596a97a362" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" /> <PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
</ItemGroup> </ItemGroup>

View File

@ -11,6 +11,7 @@ namespace EchoServiceCapnp
{ {
using (var server = new TcpRpcServer(IPAddress.Any, 5002)) using (var server = new TcpRpcServer(IPAddress.Any, 5002))
{ {
server.AddBuffering();
server.Main = new CapnpEchoService(); server.Main = new CapnpEchoService();
Console.WriteLine("Press RETURN to stop listening"); Console.WriteLine("Press RETURN to stop listening");
Console.ReadLine(); Console.ReadLine();

View File

@ -153,17 +153,17 @@ namespace Capnp.Rpc
} }
readonly RpcEngine _rpcEngine; readonly RpcEngine _rpcEngine;
readonly TcpListener _listener;
readonly object _reentrancyBlocker = new object(); readonly object _reentrancyBlocker = new object();
readonly Thread _acceptorThread;
readonly List<Connection> _connections = new List<Connection>(); readonly List<Connection> _connections = new List<Connection>();
Thread? _acceptorThread;
TcpListener? _listener;
/// <summary> /// <summary>
/// Gets the number of currently active inbound TCP connections. /// Gets the number of currently active inbound TCP connections.
/// </summary> /// </summary>
public int ConnectionCount { get; private set; } public int ConnectionCount { get; private set; }
void AcceptClients() void AcceptClients(TcpListener listener)
{ {
try try
{ {
@ -172,7 +172,7 @@ namespace Capnp.Rpc
while (true) while (true)
{ {
var client = _listener.AcceptTcpClient(); var client = listener.AcceptTcpClient();
var connection = new Connection(this, client); var connection = new Connection(this, client);
lock (_reentrancyBlocker) lock (_reentrancyBlocker)
@ -236,7 +236,10 @@ namespace Capnp.Rpc
/// </summary> /// </summary>
public void Dispose() public void Dispose()
{ {
StopListening(); if (_listener != null)
{
StopListening();
}
var connections = new List<Connection>(); var connections = new List<Connection>();
@ -252,8 +255,6 @@ namespace Capnp.Rpc
SafeJoin(connection.PumpRunner); SafeJoin(connection.PumpRunner);
} }
SafeJoin(_acceptorThread);
GC.SuppressFinalize(this); GC.SuppressFinalize(this);
} }
@ -262,6 +263,9 @@ namespace Capnp.Rpc
/// </summary> /// </summary>
public void StopListening() public void StopListening()
{ {
if (_listener == null)
throw new InvalidOperationException("Listening was never started");
try try
{ {
_listener.Stop(); _listener.Stop();
@ -269,6 +273,12 @@ namespace Capnp.Rpc
catch (SocketException) catch (SocketException)
{ {
} }
finally
{
_listener = null;
SafeJoin(_acceptorThread);
_acceptorThread = null;
}
} }
/// <summary> /// <summary>
@ -292,15 +302,46 @@ namespace Capnp.Rpc
/// <summary> /// <summary>
/// Constructs an instance. /// Constructs an instance.
/// </summary> /// </summary>
public TcpRpcServer()
{
_rpcEngine = new RpcEngine();
}
/// <summary>
/// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients.
/// If you intend configuring a midlayer or consuming the <see cref="OnConnectionChanged"/> event,
/// you should not use this constructor, since it may lead to an early-client race condition.
/// Instead, use the parameterless constructor, configure, then call <see cref="StartAccepting(IPAddress, int)"/>.
/// </summary>
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param> /// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
/// <param name="port">The port on which to listen for incoming connection attempts.</param> /// <param name="port">The port on which to listen for incoming connection attempts.</param>
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception> /// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception> /// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
public TcpRpcServer(IPAddress localAddr, int port) /// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
public TcpRpcServer(IPAddress localAddr, int port): this()
{ {
_rpcEngine = new RpcEngine(); StartAccepting(localAddr, port);
_listener = new TcpListener(localAddr, port); }
_listener.ExclusiveAddressUse = false;
/// <summary>
/// Starts listening to the specified TCP/IP endpoint and accepting clients.
/// </summary>
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
/// <param name="port">The port on which to listen for incoming connection attempts.</param>
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="InvalidOperationException">Listening activity was already started</exception>
/// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
public void StartAccepting(IPAddress localAddr, int port)
{
if (_listener != null)
throw new InvalidOperationException("Listening activity was already started");
var listener = new TcpListener(localAddr, port)
{
ExclusiveAddressUse = false
};
int attempt = 0; int attempt = 0;
@ -308,32 +349,33 @@ namespace Capnp.Rpc
{ {
try try
{ {
_listener.Start(); listener.Start();
break; break;
} }
catch (SocketException socketException) catch (SocketException socketException)
{ {
if (attempt == 5) if (attempt == 5)
throw; throw;
Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}"); Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}");
} }
++attempt; ++attempt;
Thread.Sleep(10);
} }
_acceptorThread = new Thread(AcceptClients); _acceptorThread = new Thread(() => AcceptClients(listener));
_listener = listener;
_acceptorThread.Start(); _acceptorThread.Start();
} }
/// <summary> /// <summary>
/// Whether the thread which is responsible for acception incoming attempts is still alive. /// Whether the thread which is responsible for acception incoming attempts is still alive.
/// The thread will die upon disposal, but also in case of a socket error condition. /// The thread will die after calling <see cref="StopListening"/>, upon disposal, but also in case of a socket error condition.
/// Errors which occur on a particular connection will just close that connection and won't interfere /// Errors which occur on a particular connection will just close that connection and won't interfere
/// with the acceptor thread. /// with the acceptor thread.
/// </summary> /// </summary>
public bool IsAlive => _acceptorThread.IsAlive; public bool IsAlive => _acceptorThread?.IsAlive ?? false;
/// <summary> /// <summary>
/// Sets the bootstrap capability. It must be an object which implements a valid capability interface /// Sets the bootstrap capability. It must be an object which implements a valid capability interface