diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index c6271ce..475449a 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/Benchmark/CapnpBenchmark.cs b/Benchmarking/Benchmark/CapnpBenchmark.cs index 63f3f1d..f1e1f5c 100644 --- a/Benchmarking/Benchmark/CapnpBenchmark.cs +++ b/Benchmarking/Benchmark/CapnpBenchmark.cs @@ -13,6 +13,9 @@ namespace Benchmark [Params(20, 200, 2000, 20000, 200000, 2000000)] public int PayloadBytes; + [Params(0, 256, 1024, 4096)] + public int BufferSize; + TcpRpcClient _client; IEchoer _echoer; byte[] _payload; @@ -21,6 +24,8 @@ namespace Benchmark public void Setup() { _client = new TcpRpcClient("localhost", 5002); + if (BufferSize > 0) + _client.AddBuffering(BufferSize); _client.WhenConnected.Wait(); _echoer = _client.GetMain(); _payload = new byte[PayloadBytes]; diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index df592ec..0b9048a 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/Program.cs b/Benchmarking/EchoServiceCapnp/Program.cs index fb58c04..c6ca4fa 100644 --- a/Benchmarking/EchoServiceCapnp/Program.cs +++ b/Benchmarking/EchoServiceCapnp/Program.cs @@ -11,6 +11,7 @@ namespace EchoServiceCapnp { using (var server = new TcpRpcServer(IPAddress.Any, 5002)) { + server.AddBuffering(); server.Main = new CapnpEchoService(); Console.WriteLine("Press RETURN to stop listening"); Console.ReadLine(); diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index dd29ca2..079a654 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -153,17 +153,17 @@ namespace Capnp.Rpc } readonly RpcEngine _rpcEngine; - readonly TcpListener _listener; readonly object _reentrancyBlocker = new object(); - readonly Thread _acceptorThread; readonly List _connections = new List(); + Thread? _acceptorThread; + TcpListener? _listener; /// /// Gets the number of currently active inbound TCP connections. /// public int ConnectionCount { get; private set; } - void AcceptClients() + void AcceptClients(TcpListener listener) { try { @@ -172,7 +172,7 @@ namespace Capnp.Rpc while (true) { - var client = _listener.AcceptTcpClient(); + var client = listener.AcceptTcpClient(); var connection = new Connection(this, client); lock (_reentrancyBlocker) @@ -236,7 +236,10 @@ namespace Capnp.Rpc /// public void Dispose() { - StopListening(); + if (_listener != null) + { + StopListening(); + } var connections = new List(); @@ -252,8 +255,6 @@ namespace Capnp.Rpc SafeJoin(connection.PumpRunner); } - SafeJoin(_acceptorThread); - GC.SuppressFinalize(this); } @@ -262,6 +263,9 @@ namespace Capnp.Rpc /// public void StopListening() { + if (_listener == null) + throw new InvalidOperationException("Listening was never started"); + try { _listener.Stop(); @@ -269,6 +273,12 @@ namespace Capnp.Rpc catch (SocketException) { } + finally + { + _listener = null; + SafeJoin(_acceptorThread); + _acceptorThread = null; + } } /// @@ -292,15 +302,46 @@ namespace Capnp.Rpc /// /// Constructs an instance. /// + public TcpRpcServer() + { + _rpcEngine = new RpcEngine(); + + } + + /// + /// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients. + /// If you intend configuring a midlayer or consuming the event, + /// you should not use this constructor, since it may lead to an early-client race condition. + /// Instead, use the parameterless constructor, configure, then call . + /// /// An System.Net.IPAddress that represents the local IP address. /// The port on which to listen for incoming connection attempts. /// is null. /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. - public TcpRpcServer(IPAddress localAddr, int port) + /// The underlying detected an error condition, such as the desired endpoint is already occupied. + public TcpRpcServer(IPAddress localAddr, int port): this() { - _rpcEngine = new RpcEngine(); - _listener = new TcpListener(localAddr, port); - _listener.ExclusiveAddressUse = false; + StartAccepting(localAddr, port); + } + + /// + /// Starts listening to the specified TCP/IP endpoint and accepting clients. + /// + /// An System.Net.IPAddress that represents the local IP address. + /// The port on which to listen for incoming connection attempts. + /// is null. + /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. + /// Listening activity was already started + /// The underlying detected an error condition, such as the desired endpoint is already occupied. + public void StartAccepting(IPAddress localAddr, int port) + { + if (_listener != null) + throw new InvalidOperationException("Listening activity was already started"); + + var listener = new TcpListener(localAddr, port) + { + ExclusiveAddressUse = false + }; int attempt = 0; @@ -308,32 +349,33 @@ namespace Capnp.Rpc { try { - _listener.Start(); + listener.Start(); break; } catch (SocketException socketException) { if (attempt == 5) throw; - + Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}"); } ++attempt; + Thread.Sleep(10); } - _acceptorThread = new Thread(AcceptClients); - + _acceptorThread = new Thread(() => AcceptClients(listener)); + _listener = listener; _acceptorThread.Start(); } /// /// Whether the thread which is responsible for acception incoming attempts is still alive. - /// The thread will die upon disposal, but also in case of a socket error condition. + /// The thread will die after calling , upon disposal, but also in case of a socket error condition. /// Errors which occur on a particular connection will just close that connection and won't interfere /// with the acceptor thread. /// - public bool IsAlive => _acceptorThread.IsAlive; + public bool IsAlive => _acceptorThread?.IsAlive ?? false; /// /// Sets the bootstrap capability. It must be an object which implements a valid capability interface