diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index 805886e..051d009 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/Benchmark/CapnpBenchmark.cs b/Benchmarking/Benchmark/CapnpBenchmark.cs index 63f3f1d..f1e1f5c 100644 --- a/Benchmarking/Benchmark/CapnpBenchmark.cs +++ b/Benchmarking/Benchmark/CapnpBenchmark.cs @@ -13,6 +13,9 @@ namespace Benchmark [Params(20, 200, 2000, 20000, 200000, 2000000)] public int PayloadBytes; + [Params(0, 256, 1024, 4096)] + public int BufferSize; + TcpRpcClient _client; IEchoer _echoer; byte[] _payload; @@ -21,6 +24,8 @@ namespace Benchmark public void Setup() { _client = new TcpRpcClient("localhost", 5002); + if (BufferSize > 0) + _client.AddBuffering(BufferSize); _client.WhenConnected.Wait(); _echoer = _client.GetMain(); _payload = new byte[PayloadBytes]; diff --git a/Benchmarking/CapnpProfile.sln b/Benchmarking/CapnpProfile.sln new file mode 100644 index 0000000..3167001 --- /dev/null +++ b/Benchmarking/CapnpProfile.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29728.190 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CapnpProfile", "CapnpProfile\CapnpProfile.csproj", "{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {F5DDDA09-394B-4A71-B6BE-C7103BCEF3A2} + EndGlobalSection +EndGlobal diff --git a/Benchmarking/CapnpProfile/CapnpProfile.csproj b/Benchmarking/CapnpProfile/CapnpProfile.csproj new file mode 100644 index 0000000..953f368 --- /dev/null +++ b/Benchmarking/CapnpProfile/CapnpProfile.csproj @@ -0,0 +1,19 @@ + + + + Exe + netcoreapp3.1 + + + + full + true + x64 + + + + + + + + diff --git a/Benchmarking/CapnpProfile/Program.cs b/Benchmarking/CapnpProfile/Program.cs new file mode 100644 index 0000000..274bb97 --- /dev/null +++ b/Benchmarking/CapnpProfile/Program.cs @@ -0,0 +1,32 @@ +using Capnp.Rpc; +using CapnpGen; +using CapnpProfile.Services; +using System; +using System.Net; +using System.Threading.Tasks; + +namespace CapnpProfile +{ + class Program + { + static async Task Main(string[] args) + { + using var server = new TcpRpcServer(); + server.Main = new CapnpEchoService(); + server.AddBuffering(); + server.StartAccepting(IPAddress.Any, 5002); + using var client = new TcpRpcClient("localhost", 5002); + await client.WhenConnected; + using var echoer = client.GetMain(); + var payload = new byte[20]; + new Random().NextBytes(payload); + + while (true) + { + var result = await echoer.Echo(payload); + if (result.Count != payload.Length) + throw new InvalidOperationException("Echo server malfunction"); + } + } + } +} diff --git a/Benchmarking/CapnpProfile/Protos/Echo.capnp b/Benchmarking/CapnpProfile/Protos/Echo.capnp new file mode 100644 index 0000000..2af55d5 --- /dev/null +++ b/Benchmarking/CapnpProfile/Protos/Echo.capnp @@ -0,0 +1,5 @@ +@0x8c309c720de8cf7c; + +interface Echoer { + echo @0 (input : Data) -> (output : Data); +} diff --git a/Benchmarking/CapnpProfile/Services/CapnpEchoService.cs b/Benchmarking/CapnpProfile/Services/CapnpEchoService.cs new file mode 100644 index 0000000..426c75c --- /dev/null +++ b/Benchmarking/CapnpProfile/Services/CapnpEchoService.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace CapnpProfile.Services +{ + public class CapnpEchoService : CapnpGen.IEchoer + { + public void Dispose() + { + } + + public Task> Echo(IReadOnlyList input, CancellationToken cancellationToken_ = default) + { + return Task.FromResult(input); + } + } +} diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index f50d2fa..2ef80ba 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -1,4 +1,4 @@ - + Exe @@ -6,7 +6,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/Program.cs b/Benchmarking/EchoServiceCapnp/Program.cs index fb58c04..62c408b 100644 --- a/Benchmarking/EchoServiceCapnp/Program.cs +++ b/Benchmarking/EchoServiceCapnp/Program.cs @@ -9,9 +9,11 @@ namespace EchoServiceCapnp { static void Main(string[] args) { - using (var server = new TcpRpcServer(IPAddress.Any, 5002)) + using (var server = new TcpRpcServer()) { + server.AddBuffering(); server.Main = new CapnpEchoService(); + server.StartAccepting(IPAddress.Any, 5002); Console.WriteLine("Press RETURN to stop listening"); Console.ReadLine(); } diff --git a/Benchmarking/nuget.config b/Benchmarking/nuget.config new file mode 100644 index 0000000..3f0e003 --- /dev/null +++ b/Benchmarking/nuget.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs index 34f01d5..6376c49 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs @@ -36,7 +36,7 @@ namespace Capnp.Net.Runtime.Tests Process _currentProcess; - void LaunchCompatTestProcess(string whichTest, Action test) + bool TryLaunchCompatTestProcess(string whichTest, Action test) { string myPath = Path.GetDirectoryName(typeof(TcpRpcInterop).Assembly.Location); string config; @@ -71,6 +71,12 @@ namespace Capnp.Net.Runtime.Tests "Problem after launching test process"); test(_currentProcess.StandardOutput); + + return true; + } + catch (AssertFailedException) + { + return false; } finally { @@ -85,6 +91,20 @@ namespace Capnp.Net.Runtime.Tests } } + void LaunchCompatTestProcess(string whichTest, Action test) + { + for (int retry = 0; retry < 5; retry++) + { + if (TryLaunchCompatTestProcess(whichTest, test)) + return; + + if (whichTest.StartsWith("server:")) + PrepareNextTest(); + } + + Assert.Fail("Problem after launching test process"); + } + void SendInput(string line) { _currentProcess.StandardInput.WriteLine(line); diff --git a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs index 8409ace..fd41551 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs @@ -98,14 +98,7 @@ namespace Capnp.Net.Runtime.Tests using (var server = new TcpRpcServer(IPAddress.Any, TcpPort)) using (var client = new TcpRpcClient()) { - server.OnConnectionChanged += (_, e) => - { - if (e.Connection.State == ConnectionState.Initializing) - { - e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7)); - } - }; - + server.InjectMidlayer(s => new ScatteringStream(s, 7)); client.InjectMidlayer(s => new ScatteringStream(s, 10)); client.Connect("localhost", TcpPort); client.WhenConnected.Wait(); diff --git a/Capnp.Net.Runtime.Tests/TestBase.cs b/Capnp.Net.Runtime.Tests/TestBase.cs index afdf1e0..3eb33fd 100644 --- a/Capnp.Net.Runtime.Tests/TestBase.cs +++ b/Capnp.Net.Runtime.Tests/TestBase.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Net; +using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -20,8 +21,39 @@ namespace Capnp.Net.Runtime.Tests protected ILogger Logger { get; set; } - protected TcpRpcClient SetupClient() => new TcpRpcClient("localhost", TcpPort); - protected TcpRpcServer SetupServer() => new TcpRpcServer(IPAddress.Any, TcpPort); + protected TcpRpcClient SetupClient() + { + var client = new TcpRpcClient(); + client.AddBuffering(); + client.Connect("localhost", TcpPort); + return client; + } + + protected TcpRpcServer SetupServer() + { + int attempt = 0; + + while (true) + { + try + { + var server = new TcpRpcServer(IPAddress.Any, TcpPort); + server.AddBuffering(); + return server; + } + catch (SocketException) + { + // If the TCP listening port is occupied by some other process, + // retry with a different one + + if (attempt == 5) + throw; + } + + IncrementTcpPort(); + ++attempt; + } + } protected (TcpRpcServer, TcpRpcClient) SetupClientServerPair() { diff --git a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj index 3f0b362..af2c400 100644 --- a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj +++ b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj @@ -32,6 +32,11 @@ DebugCapabilityLifecycle + + portable + true + + diff --git a/Capnp.Net.Runtime/DeserializerState.cs b/Capnp.Net.Runtime/DeserializerState.cs index 026b29e..440df25 100644 --- a/Capnp.Net.Runtime/DeserializerState.cs +++ b/Capnp.Net.Runtime/DeserializerState.cs @@ -501,19 +501,38 @@ namespace Capnp /// state does not represent a list public ListDeserializer RequireList() { - return Kind switch + switch (Kind) { - ObjectKind.ListOfBits => new ListOfBitsDeserializer(ref this, false), - ObjectKind.ListOfBytes => new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfBytes), - ObjectKind.ListOfEmpty => new ListOfEmptyDeserializer(ref this), - ObjectKind.ListOfInts => new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfInts), - ObjectKind.ListOfLongs => new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfLongs), - ObjectKind.ListOfPointers => new ListOfPointersDeserializer(ref this), - ObjectKind.ListOfShorts => new ListOfPrimitivesDeserializer(ref this, ListKind.ListOfShorts), - ObjectKind.ListOfStructs => new ListOfStructsDeserializer(ref this), - ObjectKind.Nil => new EmptyListDeserializer(), - _ => throw new DeserializationException("Cannot deserialize this object as list"), - }; + case ObjectKind.ListOfBits: + return new ListOfBitsDeserializer(this, false); + + case ObjectKind.ListOfBytes: + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfBytes); + + case ObjectKind.ListOfEmpty: + return new ListOfEmptyDeserializer(this); + + case ObjectKind.ListOfInts: + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfInts); + + case ObjectKind.ListOfLongs: + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfLongs); + + case ObjectKind.ListOfPointers: + return new ListOfPointersDeserializer(this); + + case ObjectKind.ListOfShorts: + return new ListOfPrimitivesDeserializer(this, ListKind.ListOfShorts); + + case ObjectKind.ListOfStructs: + return new ListOfStructsDeserializer(this); + + case ObjectKind.Nil: + return new EmptyListDeserializer(); + + default: + throw new DeserializationException("Cannot deserialize this object as list"); + } } /// @@ -523,11 +542,14 @@ namespace Capnp /// state does not represent a list of pointers public ListOfCapsDeserializer RequireCapList() where T: class { - return Kind switch + switch (Kind) { - ObjectKind.ListOfPointers => new ListOfCapsDeserializer(ref this), - _ => throw new DeserializationException("Cannot deserialize this object as capability list"), - }; + case ObjectKind.ListOfPointers: + return new ListOfCapsDeserializer(this); + + default: + throw new DeserializationException("Cannot deserialize this object as capability list"); + } } /// diff --git a/Capnp.Net.Runtime/FramePump.cs b/Capnp.Net.Runtime/FramePump.cs index 0ff6791..e2ecabc 100644 --- a/Capnp.Net.Runtime/FramePump.cs +++ b/Capnp.Net.Runtime/FramePump.cs @@ -116,6 +116,8 @@ namespace Capnp #endif _writer.Write(bytes); } + + _writer.Flush(); } } diff --git a/Capnp.Net.Runtime/ListDeserializer.cs b/Capnp.Net.Runtime/ListDeserializer.cs index e8499a6..75a944b 100644 --- a/Capnp.Net.Runtime/ListDeserializer.cs +++ b/Capnp.Net.Runtime/ListDeserializer.cs @@ -37,7 +37,7 @@ namespace Capnp /// protected readonly DeserializerState State; - internal ListDeserializer(ref DeserializerState state) + internal ListDeserializer(in DeserializerState state) { State = state; } diff --git a/Capnp.Net.Runtime/ListOfBitsDeserializer.cs b/Capnp.Net.Runtime/ListOfBitsDeserializer.cs index 0e5ff07..226751c 100644 --- a/Capnp.Net.Runtime/ListOfBitsDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfBitsDeserializer.cs @@ -11,8 +11,8 @@ namespace Capnp { readonly bool _defaultValue; - internal ListOfBitsDeserializer(ref DeserializerState context, bool defaultValue) : - base(ref context) + internal ListOfBitsDeserializer(in DeserializerState context, bool defaultValue) : + base(context) { _defaultValue = defaultValue; } diff --git a/Capnp.Net.Runtime/ListOfCapsDeserializer.cs b/Capnp.Net.Runtime/ListOfCapsDeserializer.cs index 22a9ac9..1339b2f 100644 --- a/Capnp.Net.Runtime/ListOfCapsDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfCapsDeserializer.cs @@ -11,7 +11,7 @@ namespace Capnp public class ListOfCapsDeserializer : ListDeserializer, IReadOnlyList where T: class { - internal ListOfCapsDeserializer(ref DeserializerState state) : base(ref state) + internal ListOfCapsDeserializer(in DeserializerState state) : base(state) { Rpc.CapabilityReflection.ValidateCapabilityInterface(typeof(T)); } diff --git a/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs b/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs index 9b6ed42..b9b3d5a 100644 --- a/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfEmptyDeserializer.cs @@ -10,8 +10,8 @@ namespace Capnp /// public class ListOfEmptyDeserializer : ListDeserializer, IReadOnlyList { - internal ListOfEmptyDeserializer(ref DeserializerState state) : - base(ref state) + internal ListOfEmptyDeserializer(in DeserializerState state) : + base(state) { } diff --git a/Capnp.Net.Runtime/ListOfPointersDeserializer.cs b/Capnp.Net.Runtime/ListOfPointersDeserializer.cs index 09aa786..1f63401 100644 --- a/Capnp.Net.Runtime/ListOfPointersDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfPointersDeserializer.cs @@ -9,8 +9,8 @@ namespace Capnp /// public class ListOfPointersDeserializer: ListDeserializer, IReadOnlyList { - internal ListOfPointersDeserializer(ref DeserializerState state) : - base(ref state) + internal ListOfPointersDeserializer(in DeserializerState state) : + base(state) { } diff --git a/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs b/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs index aa47218..072d371 100644 --- a/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfPrimitivesDeserializer.cs @@ -7,11 +7,11 @@ using System.Text; namespace Capnp { /// - /// ListDeserializer specialization for List(Int*), List(UInt*), List(Float*), and List(Enum). + /// ListDeserializer specialization for unmanaged primitive types (including enum). /// /// List element type public class ListOfPrimitivesDeserializer: ListDeserializer, IReadOnlyList - where T: struct + where T: unmanaged { class ListOfULongAsStructView : IReadOnlyList { @@ -73,8 +73,8 @@ namespace Capnp readonly ListKind _kind; - internal ListOfPrimitivesDeserializer(ref DeserializerState state, ListKind kind) : - base(ref state) + internal ListOfPrimitivesDeserializer(in DeserializerState state, ListKind kind) : + base(state) { _kind = kind; } @@ -84,7 +84,10 @@ namespace Capnp /// public override ListKind Kind => _kind; - ReadOnlySpan Data => MemoryMarshal.Cast(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count); + /// + /// Retrieves the underlying memory span of this object + /// + public ReadOnlySpan Span => MemoryMarshal.Cast(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count); /// /// Returns the element at given index. @@ -92,15 +95,14 @@ namespace Capnp /// Element index /// Element value /// is out of range. - public T this[int index] => Data[index]; + public T this[int index] => Span[index]; - ListOfPrimitivesDeserializer PrimitiveCast() where U: struct + ListOfPrimitivesDeserializer PrimitiveCast() where U: unmanaged { if (Marshal.SizeOf() != Marshal.SizeOf()) throw new NotSupportedException("Source and target types have different sizes, cannot cast"); - var stateCopy = State; - return new ListOfPrimitivesDeserializer(ref stateCopy, Kind); + return new ListOfPrimitivesDeserializer(State, Kind); } /// @@ -203,7 +205,7 @@ namespace Capnp /// Element size is different from 1 byte. public override string CastText() { - var utf8Bytes = PrimitiveCast().Data; + var utf8Bytes = PrimitiveCast().Span; if (utf8Bytes.Length == 0) return string.Empty; var utf8GytesNoZterm = utf8Bytes.Slice(0, utf8Bytes.Length - 1); return Encoding.UTF8.GetString(utf8GytesNoZterm.ToArray()); diff --git a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs index 3ff7219..cd86024 100644 --- a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs +++ b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs @@ -6,13 +6,13 @@ using System.Runtime.InteropServices; namespace Capnp { /// - /// SerializerState specialization for List(Int*), List(UInt*), List(Float*), and List(Enum). + /// SerializerState specialization for unmanaged primitive types (including enum). /// /// List element type, must be primitive. Static constructor will throw if the type does not work. public class ListOfPrimitivesSerializer : SerializerState, IReadOnlyList - where T : struct + where T : unmanaged { static readonly int ElementSize; @@ -88,17 +88,32 @@ namespace Capnp } Init(items.Count); - - for (int i = 0; i < items.Count; i++) - { - this[i] = items[i]; - } - } - IEnumerable Enumerate() - { - for (int i = 0; i < Data.Length; i++) - yield return Data[i]; + switch (items) + { + case T[] array: + array.CopyTo(Span); + break; + + case ArraySegment segment: + segment.AsSpan().CopyTo(Span); + break; + + case ListOfPrimitivesDeserializer deser: + deser.Span.CopyTo(Span); + break; + + case ListOfPrimitivesSerializer ser: + ser.Span.CopyTo(Span); + break; + + default: + for (int i = 0; i < items.Count; i++) + { + this[i] = items[i]; + } + break; + } } /// @@ -107,6 +122,6 @@ namespace Capnp /// public IEnumerator GetEnumerator() => Enumerate().GetEnumerator(); - IEnumerator IEnumerable.GetEnumerator() => Data.ToArray().GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => Span.ToArray().GetEnumerator(); } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/ListOfStructsDeserializer.cs b/Capnp.Net.Runtime/ListOfStructsDeserializer.cs index 57794e3..03e6c91 100644 --- a/Capnp.Net.Runtime/ListOfStructsDeserializer.cs +++ b/Capnp.Net.Runtime/ListOfStructsDeserializer.cs @@ -9,8 +9,8 @@ namespace Capnp /// public class ListOfStructsDeserializer: ListDeserializer, IReadOnlyList { - internal ListOfStructsDeserializer(ref DeserializerState context): - base(ref context) + internal ListOfStructsDeserializer(in DeserializerState context): + base(context) { } diff --git a/Capnp.Net.Runtime/Rpc/IConnection.cs b/Capnp.Net.Runtime/Rpc/IConnection.cs index 28311f3..8ff2026 100644 --- a/Capnp.Net.Runtime/Rpc/IConnection.cs +++ b/Capnp.Net.Runtime/Rpc/IConnection.cs @@ -4,10 +4,11 @@ using System.IO; namespace Capnp.Rpc { + /// /// Models an RPC connection. /// - public interface IConnection + public interface IConnection: ISupportsMidlayers { /// /// Returns the state of this connection. @@ -52,15 +53,6 @@ namespace Capnp.Rpc /// Connection is not in state 'Initializing' void AttachTracer(IFrameTracer tracer); - /// - /// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. - /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received - /// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms. - /// - /// Callback for wrapping the midlayer around its underlying stream - /// is null - void InjectMidlayer(Func createFunc); - /// /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case /// of this method is to refuse an incoming connection in the TcpRpcServer.OnConnectionChanged callback. diff --git a/Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs b/Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs new file mode 100644 index 0000000..5fb9399 --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs @@ -0,0 +1,21 @@ +using System; +using System.IO; + +namespace Capnp.Rpc +{ + /// + /// Common interface for classes supporting the installation of midlayers. + /// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. + /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received + /// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more. + /// + public interface ISupportsMidlayers + { + /// + /// Installs a midlayer + /// + /// Callback for wrapping the midlayer around its underlying stream + /// is null + void InjectMidlayer(Func createFunc); + } +} \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs new file mode 100644 index 0000000..eb6fbae --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Capnp.Rpc +{ + /// + /// Provides extension methods for installing midlayers to and ./>. + /// + public static class MidlayerExtensions + { + /// + /// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations, + /// hence may cause a significant performance boost. + /// + /// or + /// Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size + public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize) + { + obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s, bufferSize)); + } + + /// + /// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations, + /// hence may cause a significant performance boost. Some default buffer size will be chosen. + /// + /// or + public static void AddBuffering(this ISupportsMidlayers obj) + { + obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s)); + } + } +} diff --git a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs index 6660725..5959b7e 100644 --- a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs @@ -6,98 +6,41 @@ namespace Capnp.Rpc { class PendingAnswer: IDisposable { - readonly object _reentrancyBlocker = new object(); readonly CancellationTokenSource? _cts; - readonly TaskCompletionSource _whenCanceled; - Task _callTask; - Task? _initialTask; - Task? _chainedTask; - bool _disposed; + readonly TaskCompletionSource _cancelCompleter; + readonly Task _answerTask; public PendingAnswer(Task callTask, CancellationTokenSource? cts) { + async Task CancelableAwaitWhenReady() + { + return await await Task.WhenAny(callTask, _cancelCompleter.Task); + } + + if (callTask == null) + throw new ArgumentNullException(nameof(callTask)); + _cts = cts; - _callTask = callTask ?? throw new ArgumentNullException(nameof(callTask)); - _whenCanceled = new TaskCompletionSource(); + _cancelCompleter = new TaskCompletionSource(); + _answerTask = CancelableAwaitWhenReady(); } + public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None; + public void Cancel() { _cts?.Cancel(); - _whenCanceled.SetResult(0); + _cancelCompleter.SetCanceled(); } - async Task InitialAwaitWhenReady() + public void Chain(Action> func) { - var which = await Task.WhenAny(_callTask, _whenCanceled.Task); - - if (which != _callTask) - { - throw new TaskCanceledException(); - } + func(_answerTask); } - async Task AwaitChainedTask(Task chainedTask, Func, Task> func) + public void Chain(PromisedAnswer.READER rd, Action> func) { - try - { - await chainedTask; - } - catch (System.Exception exception) - { - await func(Task.FromException(exception)); - throw; - } - - await func(_callTask); - } - - static async Task AwaitSeq(Task task1, Task task2) - { - await task1; - await task2; - } - - public void Chain(bool strictSync, Func, Task> func) - { - - lock (_reentrancyBlocker) - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(PendingAnswer)); - } - - if (_initialTask == null) - { - _initialTask = InitialAwaitWhenReady(); - } - - Task followUpTask; - - if (strictSync) - { - followUpTask = AwaitChainedTask(_chainedTask ?? _initialTask, func); - } - else - { - followUpTask = AwaitChainedTask(_initialTask, func); - } - - if (_chainedTask != null) - { - _chainedTask = AwaitSeq(_chainedTask, followUpTask); - } - else - { - _chainedTask = followUpTask; - } - } - } - - public void Chain(bool strictSync, PromisedAnswer.READER rd, Func, Task> func) - { - Chain(strictSync, async t => + Chain(t => { async Task EvaluateProxy() { @@ -158,43 +101,13 @@ namespace Capnp.Rpc } } - await func(EvaluateProxy()); + func(EvaluateProxy()); }); } - public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None; - - public async void Dispose() + public void Dispose() { - if (_cts != null) - { - Task? chainedTask; - - lock (_reentrancyBlocker) - { - if (_disposed) - { - return; - } - chainedTask = _chainedTask; - _disposed = true; - } - - if (chainedTask != null) - { - try - { - await chainedTask; - } - catch - { - } - finally - { - _cts.Dispose(); - } - } - } + _cts?.Dispose(); } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs index a11677d..0e66b88 100644 --- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs +++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs @@ -411,7 +411,7 @@ namespace Capnp.Rpc switch (req.SendResultsTo.which) { case Call.sendResultsTo.WHICH.Caller: - pendingAnswer.Chain(false, async t => + pendingAnswer.Chain(async t => { try { @@ -479,7 +479,7 @@ namespace Capnp.Rpc break; case Call.sendResultsTo.WHICH.Yourself: - pendingAnswer.Chain(false, async t => + pendingAnswer.Chain(async t => { try { @@ -587,7 +587,6 @@ namespace Capnp.Rpc if (exists) { previousAnswer!.Chain( - false, req.Target.PromisedAnswer, async t => { @@ -690,7 +689,7 @@ namespace Capnp.Rpc if (exists) { - pendingAnswer!.Chain(false, async t => + pendingAnswer!.Chain(async t => { try { @@ -820,7 +819,7 @@ namespace Capnp.Rpc if (_answerTable.TryGetValue(promisedAnswer.QuestionId, out var previousAnswer)) { - previousAnswer.Chain(true, + previousAnswer.Chain( disembargo.Target.PromisedAnswer, async t => { @@ -929,9 +928,9 @@ namespace Capnp.Rpc void ReleaseResultCaps(PendingAnswer answer) { - try + answer.Chain(async t => { - answer.Chain(false, async t => + try { var aorcq = await t; var results = aorcq.Answer; @@ -943,11 +942,11 @@ namespace Capnp.Rpc cap?.Release(); } } - }); - } - catch - { - } + } + catch + { + } + }); } void ProcessFinish(Finish.READER finish) @@ -1258,7 +1257,7 @@ namespace Capnp.Rpc { var tcs = new TaskCompletionSource(); - pendingAnswer.Chain(false, + pendingAnswer.Chain( capDesc.ReceiverAnswer, async t => { diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 75ad222..2012f74 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -32,7 +32,7 @@ namespace Capnp.Rpc /// /// Cap'n Proto RPC TCP server. /// - public class TcpRpcServer: IDisposable + public class TcpRpcServer: ISupportsMidlayers, IDisposable { ILogger Logger { get; } = Logging.CreateLogger(); @@ -161,17 +161,17 @@ namespace Capnp.Rpc } readonly RpcEngine _rpcEngine; - readonly TcpListener _listener; readonly object _reentrancyBlocker = new object(); - readonly Thread _acceptorThread; readonly List _connections = new List(); + Thread? _acceptorThread; + TcpListener? _listener; /// /// Gets the number of currently active inbound TCP connections. /// public int ConnectionCount { get; private set; } - void AcceptClients() + void AcceptClients(TcpListener listener) { try { @@ -180,7 +180,7 @@ namespace Capnp.Rpc while (true) { - var client = _listener.AcceptTcpClient(); + var client = listener.AcceptTcpClient(); var connection = new Connection(this, client); lock (_reentrancyBlocker) @@ -244,7 +244,10 @@ namespace Capnp.Rpc /// public void Dispose() { - StopListening(); + if (_listener != null) + { + StopListening(); + } var connections = new List(); @@ -260,8 +263,6 @@ namespace Capnp.Rpc SafeJoin(connection.PumpRunner); } - SafeJoin(_acceptorThread); - GC.SuppressFinalize(this); } @@ -270,6 +271,9 @@ namespace Capnp.Rpc /// public void StopListening() { + if (_listener == null) + throw new InvalidOperationException("Listening was never started"); + try { _listener.Stop(); @@ -277,47 +281,109 @@ namespace Capnp.Rpc catch (SocketException) { } + finally + { + _listener = null; + SafeJoin(_acceptorThread); + _acceptorThread = null; + } + } + + /// + /// Installs a midlayer. + /// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. + /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received + /// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more. + /// + /// + public void InjectMidlayer(Func createFunc) + { + OnConnectionChanged += (_, e) => + { + if (e.Connection.State == ConnectionState.Initializing) + { + e.Connection.InjectMidlayer(createFunc); + } + }; } /// /// Constructs an instance. /// + public TcpRpcServer() + { + _rpcEngine = new RpcEngine(); + + } + + /// + /// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients. + /// If you intend configuring a midlayer or consuming the event, + /// you should not use this constructor, since it may lead to an early-client race condition. + /// Instead, use the parameterless constructor, configure, then call . + /// /// An System.Net.IPAddress that represents the local IP address. /// The port on which to listen for incoming connection attempts. /// is null. /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. - public TcpRpcServer(IPAddress localAddr, int port) + /// The underlying detected an error condition, such as the desired endpoint is already occupied. + public TcpRpcServer(IPAddress localAddr, int port): this() { - _rpcEngine = new RpcEngine(); - _listener = new TcpListener(localAddr, port); - _listener.ExclusiveAddressUse = false; + StartAccepting(localAddr, port); + } - for (int retry = 0; retry < 5; retry++) + /// + /// Starts listening to the specified TCP/IP endpoint and accepting clients. + /// + /// An System.Net.IPAddress that represents the local IP address. + /// The port on which to listen for incoming connection attempts. + /// is null. + /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. + /// Listening activity was already started + /// The underlying detected an error condition, such as the desired endpoint is already occupied. + public void StartAccepting(IPAddress localAddr, int port) + { + if (_listener != null) + throw new InvalidOperationException("Listening activity was already started"); + + var listener = new TcpListener(localAddr, port) + { + ExclusiveAddressUse = false + }; + + int attempt = 0; + + while (true) { try { - _listener.Start(); + listener.Start(); break; } catch (SocketException socketException) { - Logger.LogWarning($"Failed to listen on port {port}, attempt {retry}: {socketException}"); - Thread.Sleep(10); + if (attempt == 5) + throw; + + Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}"); } + + ++attempt; + Thread.Sleep(10); } - _acceptorThread = new Thread(AcceptClients); - + _acceptorThread = new Thread(() => AcceptClients(listener)); + _listener = listener; _acceptorThread.Start(); } /// /// Whether the thread which is responsible for acception incoming attempts is still alive. - /// The thread will die upon disposal, but also in case of a socket error condition. + /// The thread will die after calling , upon disposal, but also in case of a socket error condition. /// Errors which occur on a particular connection will just close that connection and won't interfere /// with the acceptor thread. /// - public bool IsAlive => _acceptorThread.IsAlive; + public bool IsAlive => _acceptorThread?.IsAlive ?? false; /// /// Sets the bootstrap capability. It must be an object which implements a valid capability interface diff --git a/Capnp.Net.Runtime/SegmentAllocator.cs b/Capnp.Net.Runtime/SegmentAllocator.cs index eb591f5..1af1a04 100644 --- a/Capnp.Net.Runtime/SegmentAllocator.cs +++ b/Capnp.Net.Runtime/SegmentAllocator.cs @@ -49,7 +49,7 @@ namespace Capnp /// Default size (in words) of a newly allocated segment. If a single allocation requires /// a bigger size, a bigger dedicated segment will be allocated. On the wire, segments will be truncated to their actual /// occupancies. - public SegmentAllocator(int defaultSegmentSize = 128) + public SegmentAllocator(int defaultSegmentSize = 64) { _defaultSegmentSize = defaultSegmentSize; } diff --git a/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs new file mode 100644 index 0000000..2bf3001 --- /dev/null +++ b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs @@ -0,0 +1,96 @@ +using System; +using System.IO; + +namespace Capnp.Util +{ + internal class DuplexBufferedStream : Stream + { + // A buffer size of 1024 bytes seems to be a good comprise, giving good performance + // in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes. + const int DefaultBufferSize = 1024; + + readonly BufferedStream _readStream; + readonly BufferedStream _writeStream; + readonly int _bufferSize; + readonly object _reentrancyBlocker = new object(); + + public DuplexBufferedStream(Stream stream, int bufferSize) + { + _readStream = new BufferedStream(stream, bufferSize); + _writeStream = new BufferedStream(stream, bufferSize); + _bufferSize = bufferSize; + } + + public DuplexBufferedStream(Stream stream): this(stream, DefaultBufferSize) + { + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => 0; + + public override long Position + { + get => 0; + set => throw new NotSupportedException(); + } + + public override void Flush() + { + _writeStream.Flush(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _readStream.Read(buffer, offset, count); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (buffer.Length > _bufferSize) // avoid moiré-like timing effects + _writeStream.Flush(); + + _writeStream.Write(buffer, offset, count); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + lock (_reentrancyBlocker) + { + try + { + _readStream.Dispose(); + } + catch + { + } + try + { + _writeStream.Dispose(); + } + catch + { + } + } + } + + base.Dispose(disposing); + } + } +}