From 47cb57805738d0ea617cccc31e5141e7c4050e97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sun, 9 Feb 2020 12:53:45 +0100 Subject: [PATCH 01/16] Perf. optimization --- Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs index b70a3a9..d901ad7 100644 --- a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs +++ b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs @@ -77,10 +77,17 @@ namespace Capnp } Init(items.Count); - - for (int i = 0; i < items.Count; i++) + + if (items is T[] array) { - this[i] = items[i]; + array.CopyTo(Data); + } + else + { + for (int i = 0; i < items.Count; i++) + { + this[i] = items[i]; + } } } From 5e71ce69c5d87a521b4e006fe9731d3a7a612342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sun, 9 Feb 2020 13:49:21 +0100 Subject: [PATCH 02/16] performance optimizations --- Benchmarking/Benchmark/Benchmark.csproj | 2 +- Benchmarking/CapnpProfile.sln | 25 ++++++++++++++++ Benchmarking/CapnpProfile/CapnpProfile.csproj | 13 ++++++++ Benchmarking/CapnpProfile/Program.cs | 30 +++++++++++++++++++ Benchmarking/CapnpProfile/Protos/Echo.capnp | 5 ++++ .../CapnpProfile/Services/CapnpEchoService.cs | 20 +++++++++++++ .../EchoServiceCapnp/EchoServiceCapnp.csproj | 4 +-- Benchmarking/nuget.config | 10 +++++++ Capnp.Net.Runtime/DeserializerState.cs | 18 +++++------ Capnp.Net.Runtime/ListDeserializer.cs | 2 +- Capnp.Net.Runtime/ListOfBitsDeserializer.cs | 4 +-- Capnp.Net.Runtime/ListOfCapsDeserializer.cs | 2 +- Capnp.Net.Runtime/ListOfEmptyDeserializer.cs | 4 +-- .../ListOfPointersDeserializer.cs | 4 +-- .../ListOfPrimitivesDeserializer.cs | 13 ++++---- .../ListOfStructsDeserializer.cs | 4 +-- 16 files changed, 130 insertions(+), 30 deletions(-) create mode 100644 Benchmarking/CapnpProfile.sln create mode 100644 Benchmarking/CapnpProfile/CapnpProfile.csproj create mode 100644 Benchmarking/CapnpProfile/Program.cs create mode 100644 Benchmarking/CapnpProfile/Protos/Echo.capnp create mode 100644 Benchmarking/CapnpProfile/Services/CapnpEchoService.cs create mode 100644 Benchmarking/nuget.config diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index 805886e..aae7298 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/CapnpProfile.sln b/Benchmarking/CapnpProfile.sln new file mode 100644 index 0000000..3167001 --- /dev/null +++ b/Benchmarking/CapnpProfile.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29728.190 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CapnpProfile", "CapnpProfile\CapnpProfile.csproj", "{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {F5DDDA09-394B-4A71-B6BE-C7103BCEF3A2} + EndGlobalSection +EndGlobal diff --git a/Benchmarking/CapnpProfile/CapnpProfile.csproj b/Benchmarking/CapnpProfile/CapnpProfile.csproj new file mode 100644 index 0000000..0edbb3f --- /dev/null +++ b/Benchmarking/CapnpProfile/CapnpProfile.csproj @@ -0,0 +1,13 @@ + + + + Exe + netcoreapp3.1 + + + + + + + + diff --git a/Benchmarking/CapnpProfile/Program.cs b/Benchmarking/CapnpProfile/Program.cs new file mode 100644 index 0000000..98c09c4 --- /dev/null +++ b/Benchmarking/CapnpProfile/Program.cs @@ -0,0 +1,30 @@ +using Capnp.Rpc; +using CapnpGen; +using CapnpProfile.Services; +using System; +using System.Net; +using System.Threading.Tasks; + +namespace CapnpProfile +{ + class Program + { + static async Task Main(string[] args) + { + using var server = new TcpRpcServer(IPAddress.Any, 5002); + server.Main = new CapnpEchoService(); + using var client = new TcpRpcClient("localhost", 5002); + await client.WhenConnected; + using var echoer = client.GetMain(); + var payload = new byte[200000]; + new Random().NextBytes(payload); + + while (true) + { + var result = await echoer.Echo(payload); + if (result.Count != payload.Length) + throw new InvalidOperationException("Echo server malfunction"); + } + } + } +} diff --git a/Benchmarking/CapnpProfile/Protos/Echo.capnp b/Benchmarking/CapnpProfile/Protos/Echo.capnp new file mode 100644 index 0000000..2af55d5 --- /dev/null +++ b/Benchmarking/CapnpProfile/Protos/Echo.capnp @@ -0,0 +1,5 @@ +@0x8c309c720de8cf7c; + +interface Echoer { + echo @0 (input : Data) -> (output : Data); +} diff --git a/Benchmarking/CapnpProfile/Services/CapnpEchoService.cs b/Benchmarking/CapnpProfile/Services/CapnpEchoService.cs new file mode 100644 index 0000000..426c75c --- /dev/null +++ b/Benchmarking/CapnpProfile/Services/CapnpEchoService.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace CapnpProfile.Services +{ + public class CapnpEchoService : CapnpGen.IEchoer + { + public void Dispose() + { + } + + public Task> Echo(IReadOnlyList input, CancellationToken cancellationToken_ = default) + { + return Task.FromResult(input); + } + } +} diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index f50d2fa..0944fa3 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -1,4 +1,4 @@ - + Exe @@ -6,7 +6,7 @@ - + diff --git a/Benchmarking/nuget.config b/Benchmarking/nuget.config new file mode 100644 index 0000000..7fcd85c --- /dev/null +++ b/Benchmarking/nuget.config @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/Capnp.Net.Runtime/DeserializerState.cs b/Capnp.Net.Runtime/DeserializerState.cs index 8dd7ce8..e6afb8b 100644 --- a/Capnp.Net.Runtime/DeserializerState.cs +++ b/Capnp.Net.Runtime/DeserializerState.cs @@ -516,28 +516,28 @@ namespace Capnp switch (Kind) { case ObjectKind.ListOfBits: - return new ListOfBitsDeserializer(ref this, false); + return new ListOfBitsDeserializer(this, false); case ObjectKind.ListOfBytes: - return new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfBytes); + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfBytes); case ObjectKind.ListOfEmpty: - return new ListOfEmptyDeserializer(ref this); + return new ListOfEmptyDeserializer(this); case ObjectKind.ListOfInts: - return new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfInts); + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfInts); case ObjectKind.ListOfLongs: - return new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfLongs); + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfLongs); case ObjectKind.ListOfPointers: - return new ListOfPointersDeserializer(ref this); + return new ListOfPointersDeserializer(this); case ObjectKind.ListOfShorts: - return new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfShorts); + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfShorts); case ObjectKind.ListOfStructs: - return new ListOfStructsDeserializer(ref this); + return new ListOfStructsDeserializer(this); case ObjectKind.Nil: return new EmptyListDeserializer(); @@ -557,7 +557,7 @@ namespace Capnp switch (Kind) { case ObjectKind.ListOfPointers: - return new ListOfCapsDeserializer(ref this); + return new ListOfCapsDeserializer(this); default: throw new DeserializationException("Cannot deserialize this object as capability list"); diff --git a/Capnp.Net.Runtime/ListDeserializer.cs b/Capnp.Net.Runtime/ListDeserializer.cs index 8db896b..e28b90c 100644 --- a/Capnp.Net.Runtime/ListDeserializer.cs +++ b/Capnp.Net.Runtime/ListDeserializer.cs @@ -34,7 +34,7 @@ namespace Capnp /// protected readonly DeserializerState State; - internal ListDeserializer(ref DeserializerState state) + internal ListDeserializer(in DeserializerState state) { State = state; } diff --git a/Capnp.Net.Runtime/ListOfBitsDeserializer.cs b/Capnp.Net.Runtime/ListOfBitsDeserializer.cs index 0e5ff07..226751c 100644 --- a/Capnp.Net.Runtime/ListOfBitsDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfBitsDeserializer.cs @@ -11,8 +11,8 @@ namespace Capnp { readonly bool _defaultValue; - internal ListOfBitsDeserializer(ref DeserializerState context, bool defaultValue) : - base(ref context) + internal ListOfBitsDeserializer(in DeserializerState context, bool defaultValue) : + base(context) { _defaultValue = defaultValue; } diff --git a/Capnp.Net.Runtime/ListOfCapsDeserializer.cs b/Capnp.Net.Runtime/ListOfCapsDeserializer.cs index 22a9ac9..1339b2f 100644 --- a/Capnp.Net.Runtime/ListOfCapsDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfCapsDeserializer.cs @@ -11,7 +11,7 @@ namespace Capnp public class ListOfCapsDeserializer : ListDeserializer, IReadOnlyList where T: class { - internal ListOfCapsDeserializer(ref DeserializerState state) : base(ref state) + internal ListOfCapsDeserializer(in DeserializerState state) : base(state) { Rpc.CapabilityReflection.ValidateCapabilityInterface(typeof(T)); } diff --git a/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs b/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs index 9b6ed42..b9b3d5a 100644 --- a/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs @@ -10,8 +10,8 @@ namespace Capnp /// public class ListOfEmptyDeserializer : ListDeserializer, IReadOnlyList { - internal ListOfEmptyDeserializer(ref DeserializerState state) : - base(ref state) + internal ListOfEmptyDeserializer(in DeserializerState state) : + base(state) { } diff --git a/Capnp.Net.Runtime/ListOfPointersDeserializer.cs b/Capnp.Net.Runtime/ListOfPointersDeserializer.cs index 520e0e7..3611461 100644 --- a/Capnp.Net.Runtime/ListOfPointersDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfPointersDeserializer.cs @@ -9,8 +9,8 @@ namespace Capnp /// public class ListOfPointersDeserializer: ListDeserializer, IReadOnlyList { - internal ListOfPointersDeserializer(ref DeserializerState state) : - base(ref state) + internal ListOfPointersDeserializer(in DeserializerState state) : + base(state) { } diff --git a/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs b/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs index 1cb357a..c2cae63 100644 --- a/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs @@ -11,7 +11,7 @@ namespace Capnp /// /// List element type public class ListOfPrimitivesDeserializer: ListDeserializer, IReadOnlyList - where T: struct + where T: unmanaged { class ListOfULongAsStructView : IReadOnlyList { @@ -73,12 +73,10 @@ namespace Capnp readonly ListKind _kind; - internal ListOfPrimitivesDeserializer(ref DeserializerState state, ListKind kind) : - base(ref state) + internal ListOfPrimitivesDeserializer(in DeserializerState state, ListKind kind) : + base(state) { _kind = kind; - - var binCoder = PrimitiveCoder.Get(); } /// @@ -96,13 +94,12 @@ namespace Capnp /// is out of range. public T this[int index] => Data[index]; - ListOfPrimitivesDeserializer PrimitiveCast() where U: struct + ListOfPrimitivesDeserializer PrimitiveCast() where U: unmanaged { if (Marshal.SizeOf() != Marshal.SizeOf()) throw new NotSupportedException("Source and target types have different sizes, cannot cast"); - var stateCopy = State; - return new ListOfPrimitivesDeserializer(ref stateCopy, Kind); + return new ListOfPrimitivesDeserializer(State, Kind); } /// diff --git a/Capnp.Net.Runtime/ListOfStructsDeserializer.cs b/Capnp.Net.Runtime/ListOfStructsDeserializer.cs index 57794e3..03e6c91 100644 --- a/Capnp.Net.Runtime/ListOfStructsDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfStructsDeserializer.cs @@ -9,8 +9,8 @@ namespace Capnp /// public class ListOfStructsDeserializer: ListDeserializer, IReadOnlyList { - internal ListOfStructsDeserializer(ref DeserializerState context): - base(ref context) + internal ListOfStructsDeserializer(in DeserializerState context): + base(context) { } From 37d642c249d19d3edb6774c54d0f7e6131283093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sun, 16 Feb 2020 17:46:23 +0100 Subject: [PATCH 03/16] Performance experiments --- Benchmarking/Benchmark/Benchmark.csproj | 2 +- Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index aae7298..4e9e87a 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index 0944fa3..9f52ec0 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + From 686dfeba521308fe962101d1bba1be343a765f33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Mon, 17 Feb 2020 21:23:44 +0100 Subject: [PATCH 04/16] Perf. opt. --- .../ListOfPrimitivesDeserializer.cs | 11 ++-- .../ListOfPrimitivesSerializer.cs | 50 ++++++++++++------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs b/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs index c2cae63..072d371 100644 --- a/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs @@ -7,7 +7,7 @@ using System.Text; namespace Capnp { /// - /// ListDeserializer specialization for List(Int*), List(UInt*), List(Float*), and List(Enum). + /// ListDeserializer specialization for unmanaged primitive types (including enum). /// /// List element type public class ListOfPrimitivesDeserializer: ListDeserializer, IReadOnlyList @@ -84,7 +84,10 @@ namespace Capnp /// public override ListKind Kind => _kind; - ReadOnlySpan Data => MemoryMarshal.Cast(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count); + /// + /// Retrieves the underlying memory span of this object + /// + public ReadOnlySpan Span => MemoryMarshal.Cast(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count); /// /// Returns the element at given index. @@ -92,7 +95,7 @@ namespace Capnp /// Element index /// Element value /// is out of range. - public T this[int index] => Data[index]; + public T this[int index] => Span[index]; ListOfPrimitivesDeserializer PrimitiveCast() where U: unmanaged { @@ -202,7 +205,7 @@ namespace Capnp /// Element size is different from 1 byte. public override string CastText() { - var utf8Bytes = PrimitiveCast().Data; + var utf8Bytes = PrimitiveCast().Span; if (utf8Bytes.Length == 0) return string.Empty; var utf8GytesNoZterm = utf8Bytes.Slice(0, utf8Bytes.Length - 1); return Encoding.UTF8.GetString(utf8GytesNoZterm.ToArray()); diff --git a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs index d901ad7..a67df9a 100644 --- a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs +++ b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs @@ -6,13 +6,13 @@ using System.Runtime.InteropServices; namespace Capnp { /// - /// SerializerState specialization for List(Int*), List(UInt*), List(Float*), and List(Enum). + /// SerializerState specialization for unmanaged primitive types (including enum). /// /// List element type, must be primitive. Static constructor will throw if the type does not work. public class ListOfPrimitivesSerializer : SerializerState, IReadOnlyList - where T : struct + where T : unmanaged { static readonly int ElementSize; @@ -28,7 +28,10 @@ namespace Capnp } } - Span Data => MemoryMarshal.Cast(RawData); + /// + /// Retrieves the underlying memory span of the represented items. + /// + public Span Span => MemoryMarshal.Cast(RawData); /// /// Gets or sets the value at given index. @@ -37,8 +40,8 @@ namespace Capnp /// Element value public T this[int index] { - get => Data[index]; - set => Data[index] = value; + get => Span[index]; + set => Span[index] = value; } /// @@ -78,25 +81,38 @@ namespace Capnp Init(items.Count); - if (items is T[] array) + switch (items) { - array.CopyTo(Data); - } - else - { - for (int i = 0; i < items.Count; i++) - { - this[i] = items[i]; - } + case T[] array: + array.CopyTo(Span); + break; + + case ArraySegment segment: + segment.AsSpan().CopyTo(Span); + break; + + case ListOfPrimitivesDeserializer deser: + deser.Span.CopyTo(Span); + break; + + case ListOfPrimitivesSerializer ser: + ser.Span.CopyTo(Span); + break; + + default: + for (int i = 0; i < items.Count; i++) + { + this[i] = items[i]; + } + break; } } /// /// Implements . /// - /// - public IEnumerator GetEnumerator() => (IEnumerator)Data.ToArray().GetEnumerator(); + public IEnumerator GetEnumerator() => (IEnumerator)Span.ToArray().GetEnumerator(); - IEnumerator IEnumerable.GetEnumerator() => Data.ToArray().GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => Span.ToArray().GetEnumerator(); } } \ No newline at end of file From 2ad89756e1022b04eec620f5018ebd75d31cab03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 11:22:13 +0100 Subject: [PATCH 05/16] Improved robustness of TCP connection handling + test --- Capnp.Net.Runtime.Tests/TcpRpcInterop.cs | 22 ++++++++++++++++++++- Capnp.Net.Runtime.Tests/TestBase.cs | 25 +++++++++++++++++++++++- Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 12 +++++++++--- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs index 58c66bf..929e3d2 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs @@ -36,7 +36,7 @@ namespace Capnp.Net.Runtime.Tests Process _currentProcess; - void LaunchCompatTestProcess(string whichTest, Action test) + bool TryLaunchCompatTestProcess(string whichTest, Action test) { string myPath = Path.GetDirectoryName(typeof(TcpRpcInterop).Assembly.Location); string config; @@ -71,6 +71,12 @@ namespace Capnp.Net.Runtime.Tests "Problem after launching test process"); test(_currentProcess.StandardOutput); + + return true; + } + catch (AssertFailedException) + { + return false; } finally { @@ -85,6 +91,20 @@ namespace Capnp.Net.Runtime.Tests } } + void LaunchCompatTestProcess(string whichTest, Action test) + { + for (int retry = 0; retry < 5; retry++) + { + if (TryLaunchCompatTestProcess(whichTest, test)) + return; + + if (whichTest.StartsWith("server:")) + PrepareNextTest(); + } + + Assert.Fail("Problem after launching test process"); + } + void SendInput(string line) { _currentProcess.StandardInput.WriteLine(line); diff --git a/Capnp.Net.Runtime.Tests/TestBase.cs b/Capnp.Net.Runtime.Tests/TestBase.cs index afdf1e0..f34842b 100644 --- a/Capnp.Net.Runtime.Tests/TestBase.cs +++ b/Capnp.Net.Runtime.Tests/TestBase.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Net; +using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -21,7 +22,29 @@ namespace Capnp.Net.Runtime.Tests protected ILogger Logger { get; set; } protected TcpRpcClient SetupClient() => new TcpRpcClient("localhost", TcpPort); - protected TcpRpcServer SetupServer() => new TcpRpcServer(IPAddress.Any, TcpPort); + protected TcpRpcServer SetupServer() + { + int attempt = 0; + + while (true) + { + try + { + return new TcpRpcServer(IPAddress.Any, TcpPort); + } + catch (SocketException) + { + // If the TCP listening port is occupied by some other process, + // retry with a different one + + if (attempt == 5) + throw; + } + + IncrementTcpPort(); + ++attempt; + } + } protected (TcpRpcServer, TcpRpcClient) SetupClientServerPair() { diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index dcd05fc..7ee616a 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -284,7 +284,9 @@ namespace Capnp.Rpc _listener = new TcpListener(localAddr, port); _listener.ExclusiveAddressUse = false; - for (int retry = 0; retry < 5; retry++) + int attempt = 0; + + while (true) { try { @@ -293,9 +295,13 @@ namespace Capnp.Rpc } catch (SocketException socketException) { - Logger.LogWarning($"Failed to listen on port {port}, attempt {retry}: {socketException}"); - Thread.Sleep(10); + if (attempt == 5) + throw; + + Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}"); } + + ++attempt; } _acceptorThread = new Thread(AcceptClients); From 4ea3bb249b7f33187100945abc987537c293e538 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 13:46:03 +0100 Subject: [PATCH 06/16] simplified PendingAnswer --- Capnp.Net.Runtime/Rpc/PendingAnswer.cs | 131 +++++-------------------- Capnp.Net.Runtime/Rpc/RpcEngine.cs | 13 ++- 2 files changed, 28 insertions(+), 116 deletions(-) diff --git a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs index 6660725..5959b7e 100644 --- a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs @@ -6,98 +6,41 @@ namespace Capnp.Rpc { class PendingAnswer: IDisposable { - readonly object _reentrancyBlocker = new object(); readonly CancellationTokenSource? _cts; - readonly TaskCompletionSource _whenCanceled; - Task _callTask; - Task? _initialTask; - Task? _chainedTask; - bool _disposed; + readonly TaskCompletionSource _cancelCompleter; + readonly Task _answerTask; public PendingAnswer(Task callTask, CancellationTokenSource? cts) { + async Task CancelableAwaitWhenReady() + { + return await await Task.WhenAny(callTask, _cancelCompleter.Task); + } + + if (callTask == null) + throw new ArgumentNullException(nameof(callTask)); + _cts = cts; - _callTask = callTask ?? throw new ArgumentNullException(nameof(callTask)); - _whenCanceled = new TaskCompletionSource(); + _cancelCompleter = new TaskCompletionSource(); + _answerTask = CancelableAwaitWhenReady(); } + public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None; + public void Cancel() { _cts?.Cancel(); - _whenCanceled.SetResult(0); + _cancelCompleter.SetCanceled(); } - async Task InitialAwaitWhenReady() + public void Chain(Action> func) { - var which = await Task.WhenAny(_callTask, _whenCanceled.Task); - - if (which != _callTask) - { - throw new TaskCanceledException(); - } + func(_answerTask); } - async Task AwaitChainedTask(Task chainedTask, Func, Task> func) + public void Chain(PromisedAnswer.READER rd, Action> func) { - try - { - await chainedTask; - } - catch (System.Exception exception) - { - await func(Task.FromException(exception)); - throw; - } - - await func(_callTask); - } - - static async Task AwaitSeq(Task task1, Task task2) - { - await task1; - await task2; - } - - public void Chain(bool strictSync, Func, Task> func) - { - - lock (_reentrancyBlocker) - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(PendingAnswer)); - } - - if (_initialTask == null) - { - _initialTask = InitialAwaitWhenReady(); - } - - Task followUpTask; - - if (strictSync) - { - followUpTask = AwaitChainedTask(_chainedTask ?? _initialTask, func); - } - else - { - followUpTask = AwaitChainedTask(_initialTask, func); - } - - if (_chainedTask != null) - { - _chainedTask = AwaitSeq(_chainedTask, followUpTask); - } - else - { - _chainedTask = followUpTask; - } - } - } - - public void Chain(bool strictSync, PromisedAnswer.READER rd, Func, Task> func) - { - Chain(strictSync, async t => + Chain(t => { async Task EvaluateProxy() { @@ -158,43 +101,13 @@ namespace Capnp.Rpc } } - await func(EvaluateProxy()); + func(EvaluateProxy()); }); } - public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None; - - public async void Dispose() + public void Dispose() { - if (_cts != null) - { - Task? chainedTask; - - lock (_reentrancyBlocker) - { - if (_disposed) - { - return; - } - chainedTask = _chainedTask; - _disposed = true; - } - - if (chainedTask != null) - { - try - { - await chainedTask; - } - catch - { - } - finally - { - _cts.Dispose(); - } - } - } + _cts?.Dispose(); } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs index 2378959..87460f8 100644 --- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs +++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs @@ -398,7 +398,7 @@ namespace Capnp.Rpc switch (req.SendResultsTo.which) { case Call.sendResultsTo.WHICH.Caller: - pendingAnswer.Chain(false, async t => + pendingAnswer.Chain(async t => { try { @@ -466,7 +466,7 @@ namespace Capnp.Rpc break; case Call.sendResultsTo.WHICH.Yourself: - pendingAnswer.Chain(false, async t => + pendingAnswer.Chain(async t => { try { @@ -575,7 +575,6 @@ namespace Capnp.Rpc if (exists) { previousAnswer!.Chain( - false, req.Target.PromisedAnswer, async t => { @@ -679,7 +678,7 @@ namespace Capnp.Rpc if (exists) { - pendingAnswer!.Chain(false, async t => + pendingAnswer!.Chain(async t => { try { @@ -807,7 +806,7 @@ namespace Capnp.Rpc if (_answerTable.TryGetValue(promisedAnswer.QuestionId, out var previousAnswer)) { - previousAnswer.Chain(true, + previousAnswer.Chain( disembargo.Target.PromisedAnswer, async t => { @@ -924,7 +923,7 @@ namespace Capnp.Rpc { try { - answer.Chain(false, async t => + answer.Chain(async t => { var aorcq = await t; var results = aorcq.Answer; @@ -1246,7 +1245,7 @@ namespace Capnp.Rpc { var tcs = new TaskCompletionSource(); - pendingAnswer.Chain(false, + pendingAnswer.Chain( capDesc.ReceiverAnswer, async t => { From 335414ed5357ddb21acfd75293a1b3acbd9c4fbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 14:17:44 +0100 Subject: [PATCH 07/16] exception handling fix --- Capnp.Net.Runtime/Rpc/RpcEngine.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs index 87460f8..d38e7aa 100644 --- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs +++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs @@ -921,9 +921,9 @@ namespace Capnp.Rpc void ReleaseResultCaps(PendingAnswer answer) { - try + answer.Chain(async t => { - answer.Chain(async t => + try { var aorcq = await t; var results = aorcq.Answer; @@ -935,11 +935,11 @@ namespace Capnp.Rpc cap?.Release(); } } - }); - } - catch - { - } + } + catch + { + } + }); } void ProcessFinish(Finish.READER finish) From 6d711b8579f656b9245bf92be79468421543e5f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 14:31:10 +0100 Subject: [PATCH 08/16] perf. experiment: smaller default segment size --- Capnp.Net.Runtime/SegmentAllocator.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Capnp.Net.Runtime/SegmentAllocator.cs b/Capnp.Net.Runtime/SegmentAllocator.cs index eb591f5..1af1a04 100644 --- a/Capnp.Net.Runtime/SegmentAllocator.cs +++ b/Capnp.Net.Runtime/SegmentAllocator.cs @@ -49,7 +49,7 @@ namespace Capnp /// Default size (in words) of a newly allocated segment. If a single allocation requires /// a bigger size, a bigger dedicated segment will be allocated. On the wire, segments will be truncated to their actual /// occupancies. - public SegmentAllocator(int defaultSegmentSize = 128) + public SegmentAllocator(int defaultSegmentSize = 64) { _defaultSegmentSize = defaultSegmentSize; } From 1649067ef6a29eb59db987d1c022b030c94a25b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 16:01:04 +0100 Subject: [PATCH 09/16] full pdb, removed dep. to capnp.runtime 1.0.0 --- Benchmarking/CapnpProfile/CapnpProfile.csproj | 9 +++++++-- Benchmarking/CapnpProfile/Program.cs | 2 +- Capnp.Net.Runtime/Capnp.Net.Runtime.csproj | 5 +++++ .../CapnpC.CSharp.MsBuild.Generation.nuspec | 4 +--- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/Benchmarking/CapnpProfile/CapnpProfile.csproj b/Benchmarking/CapnpProfile/CapnpProfile.csproj index 0edbb3f..4b7f55b 100644 --- a/Benchmarking/CapnpProfile/CapnpProfile.csproj +++ b/Benchmarking/CapnpProfile/CapnpProfile.csproj @@ -5,9 +5,14 @@ netcoreapp3.1 + + full + true + + - - + + diff --git a/Benchmarking/CapnpProfile/Program.cs b/Benchmarking/CapnpProfile/Program.cs index 98c09c4..758884f 100644 --- a/Benchmarking/CapnpProfile/Program.cs +++ b/Benchmarking/CapnpProfile/Program.cs @@ -16,7 +16,7 @@ namespace CapnpProfile using var client = new TcpRpcClient("localhost", 5002); await client.WhenConnected; using var echoer = client.GetMain(); - var payload = new byte[200000]; + var payload = new byte[20]; new Random().NextBytes(payload); while (true) diff --git a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj index 3e741a9..548cd7b 100644 --- a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj +++ b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj @@ -28,6 +28,11 @@ TRACE + + full + true + + diff --git a/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.nuspec b/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.nuspec index e3ad91d..31ae064 100644 --- a/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.nuspec +++ b/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.nuspec @@ -14,9 +14,7 @@ MIT capnproto csharp msbuild Christian Köllner and contributors - - - + From 7b16dbd6e9ed753904c936e178d5d08f326c1ff6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 21:31:59 +0100 Subject: [PATCH 10/16] buffered I/O --- Benchmarking/CapnpProfile/CapnpProfile.csproj | 3 +- Capnp.Net.Runtime/Capnp.Net.Runtime.csproj | 2 +- Capnp.Net.Runtime/FramePump.cs | 2 + Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 2 + Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 1 + .../Util/DuplexBufferedStream.cs | 83 +++++++++++++++++++ 6 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 Capnp.Net.Runtime/Util/DuplexBufferedStream.cs diff --git a/Benchmarking/CapnpProfile/CapnpProfile.csproj b/Benchmarking/CapnpProfile/CapnpProfile.csproj index 4b7f55b..fb4935e 100644 --- a/Benchmarking/CapnpProfile/CapnpProfile.csproj +++ b/Benchmarking/CapnpProfile/CapnpProfile.csproj @@ -8,10 +8,11 @@ full true + x64 - + diff --git a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj index 548cd7b..443111e 100644 --- a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj +++ b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj @@ -29,7 +29,7 @@ - full + portable true diff --git a/Capnp.Net.Runtime/FramePump.cs b/Capnp.Net.Runtime/FramePump.cs index 0ff6791..e2ecabc 100644 --- a/Capnp.Net.Runtime/FramePump.cs +++ b/Capnp.Net.Runtime/FramePump.cs @@ -116,6 +116,8 @@ namespace Capnp #endif _writer.Write(bytes); } + + _writer.Flush(); } } diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index 02a975d..d6d333e 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -128,6 +128,8 @@ namespace Capnp.Rpc _rpcEngine = new RpcEngine(); _client = new TcpClient(); _client.ExclusiveAddressUse = false; + + InjectMidlayer(s => new Util.DuplexBufferedStream(s)); } /// diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 7ee616a..9ae030d 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -68,6 +68,7 @@ namespace Capnp.Rpc _server = server; Client = client; _stream = client.GetStream(); + InjectMidlayer(s => new Util.DuplexBufferedStream(s)); } public void Start() diff --git a/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs new file mode 100644 index 0000000..05bc69a --- /dev/null +++ b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs @@ -0,0 +1,83 @@ +using System; +using System.IO; + +namespace Capnp.Util +{ + internal class DuplexBufferedStream : Stream + { + readonly BufferedStream _readStream; + readonly BufferedStream _writeStream; + readonly object _reentrancyBlocker = new object(); + + public DuplexBufferedStream(Stream stream) + { + _readStream = new BufferedStream(stream); + _writeStream = new BufferedStream(stream); + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => 0; + + public override long Position + { + get => 0; + set => throw new NotSupportedException(); + } + + public override void Flush() + { + _writeStream.Flush(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _readStream.Read(buffer, offset, count); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + _writeStream.Write(buffer, offset, count); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + lock (_reentrancyBlocker) + { + try + { + _readStream.Dispose(); + } + catch + { + } + try + { + _writeStream.Dispose(); + } + catch + { + } + } + } + + base.Dispose(disposing); + } + } +} From 49c5e80436e76f1eae834f0e9579a2655e96af3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 22:06:23 +0100 Subject: [PATCH 11/16] avoid multimodal timing effects --- Benchmarking/Benchmark/Benchmark.csproj | 2 +- .../EchoServiceCapnp/EchoServiceCapnp.csproj | 2 +- Capnp.Net.Runtime/Util/DuplexBufferedStream.cs | 17 ++++++++++++++--- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index 4e9e87a..dd945db 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index 9f52ec0..eabdfbe 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + diff --git a/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs index 05bc69a..11e3f06 100644 --- a/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs +++ b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs @@ -5,14 +5,22 @@ namespace Capnp.Util { internal class DuplexBufferedStream : Stream { + const int DefaultBufferSize = 4096; + readonly BufferedStream _readStream; readonly BufferedStream _writeStream; + readonly int _bufferSize; readonly object _reentrancyBlocker = new object(); - public DuplexBufferedStream(Stream stream) + public DuplexBufferedStream(Stream stream, int bufferSize) + { + _readStream = new BufferedStream(stream, bufferSize); + _writeStream = new BufferedStream(stream, bufferSize); + _bufferSize = bufferSize; + } + + public DuplexBufferedStream(Stream stream): this(stream, DefaultBufferSize) { - _readStream = new BufferedStream(stream); - _writeStream = new BufferedStream(stream); } public override bool CanRead => true; @@ -51,6 +59,9 @@ namespace Capnp.Util public override void Write(byte[] buffer, int offset, int count) { + if (buffer.Length > _bufferSize) // avoid moiré-like timing effects + _writeStream.Flush(); + _writeStream.Write(buffer, offset, count); } From 596a97a362450b0cf4b3412e4b6b4bdf645ba9c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 23:47:56 +0100 Subject: [PATCH 12/16] configurable buffering support --- Benchmarking/Benchmark/Benchmark.csproj | 2 +- .../EchoServiceCapnp/EchoServiceCapnp.csproj | 2 +- Capnp.Net.Runtime.Tests/TcpRpcStress.cs | 9 +---- Capnp.Net.Runtime.Tests/TestBase.cs | 12 +++++-- Capnp.Net.Runtime/Rpc/IConnection.cs | 12 ++----- Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs | 21 ++++++++++++ Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs | 33 +++++++++++++++++++ Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 2 -- Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 21 ++++++++++-- 9 files changed, 88 insertions(+), 26 deletions(-) create mode 100644 Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs create mode 100644 Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index dd945db..c6271ce 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index eabdfbe..df592ec 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + diff --git a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs index 8409ace..fd41551 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs @@ -98,14 +98,7 @@ namespace Capnp.Net.Runtime.Tests using (var server = new TcpRpcServer(IPAddress.Any, TcpPort)) using (var client = new TcpRpcClient()) { - server.OnConnectionChanged += (_, e) => - { - if (e.Connection.State == ConnectionState.Initializing) - { - e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7)); - } - }; - + server.InjectMidlayer(s => new ScatteringStream(s, 7)); client.InjectMidlayer(s => new ScatteringStream(s, 10)); client.Connect("localhost", TcpPort); client.WhenConnected.Wait(); diff --git a/Capnp.Net.Runtime.Tests/TestBase.cs b/Capnp.Net.Runtime.Tests/TestBase.cs index f34842b..58248eb 100644 --- a/Capnp.Net.Runtime.Tests/TestBase.cs +++ b/Capnp.Net.Runtime.Tests/TestBase.cs @@ -21,7 +21,13 @@ namespace Capnp.Net.Runtime.Tests protected ILogger Logger { get; set; } - protected TcpRpcClient SetupClient() => new TcpRpcClient("localhost", TcpPort); + protected TcpRpcClient SetupClient() + { + var client = new TcpRpcClient("localhost", TcpPort); + client.AddBuffering(); + return client; + } + protected TcpRpcServer SetupServer() { int attempt = 0; @@ -30,7 +36,9 @@ namespace Capnp.Net.Runtime.Tests { try { - return new TcpRpcServer(IPAddress.Any, TcpPort); + var server = new TcpRpcServer(IPAddress.Any, TcpPort); + server.AddBuffering(); + return server; } catch (SocketException) { diff --git a/Capnp.Net.Runtime/Rpc/IConnection.cs b/Capnp.Net.Runtime/Rpc/IConnection.cs index 28311f3..8ff2026 100644 --- a/Capnp.Net.Runtime/Rpc/IConnection.cs +++ b/Capnp.Net.Runtime/Rpc/IConnection.cs @@ -4,10 +4,11 @@ using System.IO; namespace Capnp.Rpc { + /// /// Models an RPC connection. /// - public interface IConnection + public interface IConnection: ISupportsMidlayers { /// /// Returns the state of this connection. @@ -52,15 +53,6 @@ namespace Capnp.Rpc /// Connection is not in state 'Initializing' void AttachTracer(IFrameTracer tracer); - /// - /// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. - /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received - /// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms. - /// - /// Callback for wrapping the midlayer around its underlying stream - /// is null - void InjectMidlayer(Func createFunc); - /// /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case /// of this method is to refuse an incoming connection in the TcpRpcServer.OnConnectionChanged callback. diff --git a/Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs b/Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs new file mode 100644 index 0000000..5fb9399 --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs @@ -0,0 +1,21 @@ +using System; +using System.IO; + +namespace Capnp.Rpc +{ + /// + /// Common interface for classes supporting the installation of midlayers. + /// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. + /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received + /// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more. + /// + public interface ISupportsMidlayers + { + /// + /// Installs a midlayer + /// + /// Callback for wrapping the midlayer around its underlying stream + /// is null + void InjectMidlayer(Func createFunc); + } +} \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs new file mode 100644 index 0000000..eb6fbae --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Capnp.Rpc +{ + /// + /// Provides extension methods for installing midlayers to and ./>. + /// + public static class MidlayerExtensions + { + /// + /// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations, + /// hence may cause a significant performance boost. + /// + /// or + /// Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size + public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize) + { + obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s, bufferSize)); + } + + /// + /// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations, + /// hence may cause a significant performance boost. Some default buffer size will be chosen. + /// + /// or + public static void AddBuffering(this ISupportsMidlayers obj) + { + obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s)); + } + } +} diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index d6d333e..02a975d 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -128,8 +128,6 @@ namespace Capnp.Rpc _rpcEngine = new RpcEngine(); _client = new TcpClient(); _client.ExclusiveAddressUse = false; - - InjectMidlayer(s => new Util.DuplexBufferedStream(s)); } /// diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 9ae030d..dd29ca2 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -32,7 +32,7 @@ namespace Capnp.Rpc /// /// Cap'n Proto RPC TCP server. /// - public class TcpRpcServer: IDisposable + public class TcpRpcServer: ISupportsMidlayers, IDisposable { ILogger Logger { get; } = Logging.CreateLogger(); @@ -68,7 +68,6 @@ namespace Capnp.Rpc _server = server; Client = client; _stream = client.GetStream(); - InjectMidlayer(s => new Util.DuplexBufferedStream(s)); } public void Start() @@ -272,6 +271,24 @@ namespace Capnp.Rpc } } + /// + /// Installs a midlayer. + /// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. + /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received + /// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more. + /// + /// + public void InjectMidlayer(Func createFunc) + { + OnConnectionChanged += (_, e) => + { + if (e.Connection.State == ConnectionState.Initializing) + { + e.Connection.InjectMidlayer(createFunc); + } + }; + } + /// /// Constructs an instance. /// From 409e517587189bbcb809e285f5de019a8dc60585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sun, 23 Feb 2020 14:24:17 +0100 Subject: [PATCH 13/16] TcpRpcServer,StartAccepting --- Benchmarking/Benchmark/Benchmark.csproj | 2 +- Benchmarking/Benchmark/CapnpBenchmark.cs | 5 ++ .../EchoServiceCapnp/EchoServiceCapnp.csproj | 2 +- Benchmarking/EchoServiceCapnp/Program.cs | 1 + Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 76 ++++++++++++++----- 5 files changed, 67 insertions(+), 19 deletions(-) diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index c6271ce..475449a 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/Benchmark/CapnpBenchmark.cs b/Benchmarking/Benchmark/CapnpBenchmark.cs index 63f3f1d..f1e1f5c 100644 --- a/Benchmarking/Benchmark/CapnpBenchmark.cs +++ b/Benchmarking/Benchmark/CapnpBenchmark.cs @@ -13,6 +13,9 @@ namespace Benchmark [Params(20, 200, 2000, 20000, 200000, 2000000)] public int PayloadBytes; + [Params(0, 256, 1024, 4096)] + public int BufferSize; + TcpRpcClient _client; IEchoer _echoer; byte[] _payload; @@ -21,6 +24,8 @@ namespace Benchmark public void Setup() { _client = new TcpRpcClient("localhost", 5002); + if (BufferSize > 0) + _client.AddBuffering(BufferSize); _client.WhenConnected.Wait(); _echoer = _client.GetMain(); _payload = new byte[PayloadBytes]; diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index df592ec..0b9048a 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/Program.cs b/Benchmarking/EchoServiceCapnp/Program.cs index fb58c04..c6ca4fa 100644 --- a/Benchmarking/EchoServiceCapnp/Program.cs +++ b/Benchmarking/EchoServiceCapnp/Program.cs @@ -11,6 +11,7 @@ namespace EchoServiceCapnp { using (var server = new TcpRpcServer(IPAddress.Any, 5002)) { + server.AddBuffering(); server.Main = new CapnpEchoService(); Console.WriteLine("Press RETURN to stop listening"); Console.ReadLine(); diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index dd29ca2..079a654 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -153,17 +153,17 @@ namespace Capnp.Rpc } readonly RpcEngine _rpcEngine; - readonly TcpListener _listener; readonly object _reentrancyBlocker = new object(); - readonly Thread _acceptorThread; readonly List _connections = new List(); + Thread? _acceptorThread; + TcpListener? _listener; /// /// Gets the number of currently active inbound TCP connections. /// public int ConnectionCount { get; private set; } - void AcceptClients() + void AcceptClients(TcpListener listener) { try { @@ -172,7 +172,7 @@ namespace Capnp.Rpc while (true) { - var client = _listener.AcceptTcpClient(); + var client = listener.AcceptTcpClient(); var connection = new Connection(this, client); lock (_reentrancyBlocker) @@ -236,7 +236,10 @@ namespace Capnp.Rpc /// public void Dispose() { - StopListening(); + if (_listener != null) + { + StopListening(); + } var connections = new List(); @@ -252,8 +255,6 @@ namespace Capnp.Rpc SafeJoin(connection.PumpRunner); } - SafeJoin(_acceptorThread); - GC.SuppressFinalize(this); } @@ -262,6 +263,9 @@ namespace Capnp.Rpc /// public void StopListening() { + if (_listener == null) + throw new InvalidOperationException("Listening was never started"); + try { _listener.Stop(); @@ -269,6 +273,12 @@ namespace Capnp.Rpc catch (SocketException) { } + finally + { + _listener = null; + SafeJoin(_acceptorThread); + _acceptorThread = null; + } } /// @@ -292,15 +302,46 @@ namespace Capnp.Rpc /// /// Constructs an instance. /// + public TcpRpcServer() + { + _rpcEngine = new RpcEngine(); + + } + + /// + /// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients. + /// If you intend configuring a midlayer or consuming the event, + /// you should not use this constructor, since it may lead to an early-client race condition. + /// Instead, use the parameterless constructor, configure, then call . + /// /// An System.Net.IPAddress that represents the local IP address. /// The port on which to listen for incoming connection attempts. /// is null. /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. - public TcpRpcServer(IPAddress localAddr, int port) + /// The underlying detected an error condition, such as the desired endpoint is already occupied. + public TcpRpcServer(IPAddress localAddr, int port): this() { - _rpcEngine = new RpcEngine(); - _listener = new TcpListener(localAddr, port); - _listener.ExclusiveAddressUse = false; + StartAccepting(localAddr, port); + } + + /// + /// Starts listening to the specified TCP/IP endpoint and accepting clients. + /// + /// An System.Net.IPAddress that represents the local IP address. + /// The port on which to listen for incoming connection attempts. + /// is null. + /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. + /// Listening activity was already started + /// The underlying detected an error condition, such as the desired endpoint is already occupied. + public void StartAccepting(IPAddress localAddr, int port) + { + if (_listener != null) + throw new InvalidOperationException("Listening activity was already started"); + + var listener = new TcpListener(localAddr, port) + { + ExclusiveAddressUse = false + }; int attempt = 0; @@ -308,32 +349,33 @@ namespace Capnp.Rpc { try { - _listener.Start(); + listener.Start(); break; } catch (SocketException socketException) { if (attempt == 5) throw; - + Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}"); } ++attempt; + Thread.Sleep(10); } - _acceptorThread = new Thread(AcceptClients); - + _acceptorThread = new Thread(() => AcceptClients(listener)); + _listener = listener; _acceptorThread.Start(); } /// /// Whether the thread which is responsible for acception incoming attempts is still alive. - /// The thread will die upon disposal, but also in case of a socket error condition. + /// The thread will die after calling , upon disposal, but also in case of a socket error condition. /// Errors which occur on a particular connection will just close that connection and won't interfere /// with the acceptor thread. /// - public bool IsAlive => _acceptorThread.IsAlive; + public bool IsAlive => _acceptorThread?.IsAlive ?? false; /// /// Sets the bootstrap capability. It must be an object which implements a valid capability interface From f2879bddb2c1f2d59ccc91e5e30a8d2fd7e9e492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sun, 23 Feb 2020 17:27:31 +0100 Subject: [PATCH 14/16] default buffer size = 1024 bytes --- Benchmarking/Benchmark/Benchmark.csproj | 2 +- Benchmarking/CapnpProfile/CapnpProfile.csproj | 2 +- Benchmarking/CapnpProfile/Program.cs | 4 +++- Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj | 2 +- Benchmarking/EchoServiceCapnp/Program.cs | 3 ++- Capnp.Net.Runtime/Util/DuplexBufferedStream.cs | 4 +++- 6 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index 475449a..051d009 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/CapnpProfile/CapnpProfile.csproj b/Benchmarking/CapnpProfile/CapnpProfile.csproj index fb4935e..953f368 100644 --- a/Benchmarking/CapnpProfile/CapnpProfile.csproj +++ b/Benchmarking/CapnpProfile/CapnpProfile.csproj @@ -12,7 +12,7 @@ - + diff --git a/Benchmarking/CapnpProfile/Program.cs b/Benchmarking/CapnpProfile/Program.cs index 758884f..274bb97 100644 --- a/Benchmarking/CapnpProfile/Program.cs +++ b/Benchmarking/CapnpProfile/Program.cs @@ -11,8 +11,10 @@ namespace CapnpProfile { static async Task Main(string[] args) { - using var server = new TcpRpcServer(IPAddress.Any, 5002); + using var server = new TcpRpcServer(); server.Main = new CapnpEchoService(); + server.AddBuffering(); + server.StartAccepting(IPAddress.Any, 5002); using var client = new TcpRpcClient("localhost", 5002); await client.WhenConnected; using var echoer = client.GetMain(); diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index 0b9048a..2ef80ba 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/Program.cs b/Benchmarking/EchoServiceCapnp/Program.cs index c6ca4fa..62c408b 100644 --- a/Benchmarking/EchoServiceCapnp/Program.cs +++ b/Benchmarking/EchoServiceCapnp/Program.cs @@ -9,10 +9,11 @@ namespace EchoServiceCapnp { static void Main(string[] args) { - using (var server = new TcpRpcServer(IPAddress.Any, 5002)) + using (var server = new TcpRpcServer()) { server.AddBuffering(); server.Main = new CapnpEchoService(); + server.StartAccepting(IPAddress.Any, 5002); Console.WriteLine("Press RETURN to stop listening"); Console.ReadLine(); } diff --git a/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs index 11e3f06..2bf3001 100644 --- a/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs +++ b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs @@ -5,7 +5,9 @@ namespace Capnp.Util { internal class DuplexBufferedStream : Stream { - const int DefaultBufferSize = 4096; + // A buffer size of 1024 bytes seems to be a good comprise, giving good performance + // in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes. + const int DefaultBufferSize = 1024; readonly BufferedStream _readStream; readonly BufferedStream _writeStream; From 26d500866e437d03c732401b4dfc61eb461f8e52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sun, 23 Feb 2020 18:24:16 +0100 Subject: [PATCH 15/16] fixed race --- Capnp.Net.Runtime.Tests/TestBase.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Capnp.Net.Runtime.Tests/TestBase.cs b/Capnp.Net.Runtime.Tests/TestBase.cs index 58248eb..3eb33fd 100644 --- a/Capnp.Net.Runtime.Tests/TestBase.cs +++ b/Capnp.Net.Runtime.Tests/TestBase.cs @@ -23,8 +23,9 @@ namespace Capnp.Net.Runtime.Tests protected TcpRpcClient SetupClient() { - var client = new TcpRpcClient("localhost", TcpPort); + var client = new TcpRpcClient(); client.AddBuffering(); + client.Connect("localhost", TcpPort); return client; } From 313e34b3c03efc25df67fbed8da8c677bcb4b165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 7 Mar 2020 19:42:04 +0100 Subject: [PATCH 16/16] nuget.config --- Benchmarking/nuget.config | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/Benchmarking/nuget.config b/Benchmarking/nuget.config index 7fcd85c..3f0e003 100644 --- a/Benchmarking/nuget.config +++ b/Benchmarking/nuget.config @@ -1,10 +1,6 @@ - + - - - - - - - + + + \ No newline at end of file