From a2fc5c01f7f192e17e37e27a4d3e19bf8c6d9b5e Mon Sep 17 00:00:00 2001 From: c80k Date: Mon, 30 Dec 2019 19:44:07 +0100 Subject: [PATCH 1/4] Update README.md --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index c9ecef5..0abcdcc 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,9 @@ choco install capnpc-csharp-win-x86 Both versions will also download and install the [Cap'n Proto tool set Chocolatey package](https://www.chocolatey.org/packages/capnproto). Note that the author does not maintain this package and has no influence on its contents. +*Announcement: There is currently an experimental branch for packaging the code generator back end as .NET Core CLI tool. If this approach turns out to be viable, it will probably be superior to the Chocolatey deployment. In that case, the CLI tool capnpc-csharp will be deployed on NuGet.org and the former Chocolatey package will be deprecated. capnpc-csharp-win-x86 will probably be kept (and also maintained).* + + ### Code generator back end: Other OSes Currently, you are on yourself. Compile the `capnpc-csharp` VS project and install the resulting .NET application manually on your system. This should not be that complicated, see also the [Wiki](https://github.com/c80k/capnproto-dotnetcore/wiki). It would be great to support other package managers, especially [APT](https://wiki.debian.org/Apt). Consider contributing? Author would be happy! From 4bcc97a69fc67ddbbca3ae8c5b6f3c3d7ecd46c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Fri, 31 Jan 2020 21:40:25 +0100 Subject: [PATCH 2/4] Fix + test case for issue #37 --- .../Capnp.Net.Runtime.Tests.Core21.csproj | 1 + Capnp.Net.Runtime.Tests/ScatteringStream.cs | 58 +++++++++++++++++++ Capnp.Net.Runtime.Tests/TcpRpcStress.cs | 35 +++++++++++ Capnp.Net.Runtime/Framing.cs | 21 ++++++- Capnp.Net.Runtime/Rpc/IConnection.cs | 3 + Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 18 +++++- Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 34 +++++++---- 7 files changed, 155 insertions(+), 15 deletions(-) create mode 100644 Capnp.Net.Runtime.Tests/ScatteringStream.cs diff --git a/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj b/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj index 1541a85..30d40e2 100644 --- a/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj +++ b/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj @@ -22,6 +22,7 @@ + diff --git a/Capnp.Net.Runtime.Tests/ScatteringStream.cs b/Capnp.Net.Runtime.Tests/ScatteringStream.cs new file mode 100644 index 0000000..e6b44df --- /dev/null +++ b/Capnp.Net.Runtime.Tests/ScatteringStream.cs @@ -0,0 +1,58 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Capnp.Net.Runtime.Tests +{ + /// + /// Imitates the behavior of a TCP connection by real hardware, which splits data transfer into multiple packets. + /// + class ScatteringStream : Stream + { + readonly Stream _baseStream; + readonly int _mtu; + + public ScatteringStream(Stream baseStream, int mtu) + { + _baseStream = baseStream; + _mtu = mtu; + } + + public override bool CanRead => _baseStream.CanRead; + + public override bool CanSeek => false; + + public override bool CanWrite => _baseStream.CanWrite; + + public override long Length => _baseStream.Length; + + public override long Position + { + get => _baseStream.Position; + set => throw new NotImplementedException(); + } + + public override void Flush() => _baseStream.Flush(); + + public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count); + + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + + public override void SetLength(long value) => throw new NotImplementedException(); + + public override void Write(byte[] buffer, int offset, int count) + { + while (count > 0) + { + int amount = Math.Min(count, _mtu); + _baseStream.Write(buffer, offset, amount); + _baseStream.Flush(); + offset += amount; + count -= amount; + } + } + } +} diff --git a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs index d45c45c..6ca2c14 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs @@ -89,5 +89,40 @@ namespace Capnp.Net.Runtime.Tests var t = new TcpRpc(); Repeat(100, t.PipelineAfterReturn); } + + [TestMethod] + public void ScatteredTransfer() + { + + using (var server = SetupServer()) + using (var client = new TcpRpcClient()) + { + client.InjectMidlayer(s => new ScatteringStream(s, 10)); + client.Connect("localhost", TcpPort); + client.WhenConnected.Wait(); + + var counters = new Counters(); + server.Main = new TestInterfaceImpl(counters); + using (var main = client.GetMain()) + { + for (int i = 0; i < 100; i++) + { + var request1 = main.Foo(123, true, default); + var request3 = Assert.ThrowsExceptionAsync(() => main.Bar(default)); + var s = new TestAllTypes(); + Common.InitTestMessage(s); + var request2 = main.Baz(s, default); + + Assert.IsTrue(request1.Wait(MediumNonDbgTimeout)); + Assert.IsTrue(request2.Wait(MediumNonDbgTimeout)); + Assert.IsTrue(request3.Wait(MediumNonDbgTimeout)); + + Assert.AreEqual("foo", request1.Result); + Assert.AreEqual(2, counters.CallCount); + counters.CallCount = 0; + } + } + } + } } } diff --git a/Capnp.Net.Runtime/Framing.cs b/Capnp.Net.Runtime/Framing.cs index e9707ce..bba2875 100644 --- a/Capnp.Net.Runtime/Framing.cs +++ b/Capnp.Net.Runtime/Framing.cs @@ -79,7 +79,10 @@ namespace Capnp return new WireFrame(buffers); } - + + static InvalidDataException StreamClosed() + => new InvalidDataException("Prematurely reached end of stream. Expected more bytes according to framing header."); + static void FillBuffersFromFrames(Memory[] buffers, uint segmentCount, BinaryReader reader) { for (uint i = 0; i < segmentCount; i++) @@ -90,7 +93,12 @@ namespace Capnp if (tmpBuffer.Length != buffer.Length) { - throw new InvalidDataException("Expected more bytes according to framing header"); + // Note w.r.t. issue #37: If there are temporarily less bytes available, + // this will NOT cause ReadBytes to return a shorter buffer. + // Only if the end of the stream is reached will we enter this branch. And this will be an error condition, + // since it would mean that the connection was closed in the middle of a frame transfer. + + throw StreamClosed(); } // Fastest way to do this without /unsafe @@ -101,7 +109,14 @@ namespace Capnp } #else var buffer = MemoryMarshal.Cast(buffers[i].Span); - reader.Read(buffer); + + while (buffer.Length > 0) + { + int obtained = reader.Read(buffer); + if (obtained == 0) + throw StreamClosed(); + buffer = buffer.Slice(obtained); + } #endif } } diff --git a/Capnp.Net.Runtime/Rpc/IConnection.cs b/Capnp.Net.Runtime/Rpc/IConnection.cs index 582b17e..3a15f6f 100644 --- a/Capnp.Net.Runtime/Rpc/IConnection.cs +++ b/Capnp.Net.Runtime/Rpc/IConnection.cs @@ -1,5 +1,6 @@ using Capnp.FrameTracing; using System; +using System.IO; namespace Capnp.Rpc { @@ -51,6 +52,8 @@ namespace Capnp.Rpc /// Connection is not in state 'Initializing' void AttachTracer(IFrameTracer tracer); + void InjectMidlayer(Func createFunc); + /// /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case /// of this method is to refuse an incoming connection in the TcpRpcServer.OnConnectionChanged callback. diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index 0f3e1a6..293f9a5 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.CompilerServices; @@ -44,6 +45,7 @@ namespace Capnp.Rpc readonly RpcEngine _rpcEngine; readonly TcpClient _client; + Func _createLayers = _ => _; RpcEngine.RpcEndpoint _inboundEndpoint; OutboundTcpEndpoint _outboundEndpoint; FramePump _pump; @@ -82,7 +84,9 @@ namespace Capnp.Rpc await ConnectAsync(host, port); State = ConnectionState.Active; - _pump = new FramePump(_client.GetStream()); + + var stream = _createLayers(_client.GetStream()); + _pump = new FramePump(stream); _attachTracerAction?.Invoke(); _outboundEndpoint = new OutboundTcpEndpoint(this, _pump); _inboundEndpoint = _rpcEngine.AddEndpoint(_outboundEndpoint); @@ -218,6 +222,18 @@ namespace Capnp.Rpc }; } + public void InjectMidlayer(Func createFunc) + { + if (createFunc == null) + throw new ArgumentNullException(nameof(createFunc)); + + if (State != ConnectionState.Initializing) + throw new InvalidOperationException("Connection is not in state 'Initializing'"); + + var last = _createLayers; + _createLayers = _ => createFunc(last(_)); + } + /// /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. /// diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 6ac5cb1..f3c0553 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; @@ -60,24 +61,29 @@ namespace Capnp.Rpc class Connection: IConnection { readonly TcpRpcServer _server; + Stream _stream; - public Connection(TcpRpcServer server, TcpClient client, FramePump pump, OutboundTcpEndpoint outboundEp, RpcEngine.RpcEndpoint inboundEp) + public Connection(TcpRpcServer server, TcpClient client) { _server = server; Client = client; - Pump = pump; - OutboundEp = outboundEp; - InboundEp = inboundEp; + _stream = client.GetStream(); } public void Start() { + Pump = new FramePump(_stream); + OutboundEp = new OutboundTcpEndpoint(_server, Pump); + InboundEp = _server._rpcEngine.AddEndpoint(OutboundEp); + Pump.FrameReceived += InboundEp.Forward; + + State = ConnectionState.Active; + PumpRunner = new Thread(o => { try { Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}"; - State = ConnectionState.Active; Pump.Run(); } @@ -122,6 +128,17 @@ namespace Capnp.Rpc Pump.AttachTracer(tracer); } + public void InjectMidlayer(Func createFunc) + { + if (createFunc == null) + throw new ArgumentNullException(nameof(createFunc)); + + if (State != ConnectionState.Initializing) + throw new InvalidOperationException("Connection is not in state 'Initializing'"); + + _stream = createFunc(_stream); + } + public void Close() { Client.Dispose(); @@ -149,12 +166,7 @@ namespace Capnp.Rpc while (true) { var client = _listener.AcceptTcpClient(); - var pump = new FramePump(client.GetStream()); - var outboundEndpoint = new OutboundTcpEndpoint(this, pump); - var inboundEndpoint = _rpcEngine.AddEndpoint(outboundEndpoint); - pump.FrameReceived += inboundEndpoint.Forward; - - var connection = new Connection(this, client, pump, outboundEndpoint, inboundEndpoint); + var connection = new Connection(this, client); lock (_reentrancyBlocker) { From 72308d38d5d4911be062d327aa966b1b4fd8d921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Fri, 31 Jan 2020 22:13:41 +0100 Subject: [PATCH 3/4] rollback version to 1.2 (critical fix #37) --- Capnp.Net.Runtime/Capnp.Net.Runtime.csproj | 2 +- .../CapnpC.CSharp.MsBuild.Generation.csproj | 6 +++--- MsBuildGenerationTest/MsBuildGenerationTest.csproj | 2 +- appveyor.yml | 2 +- capnpc-csharp/capnpc-csharp.csproj | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj index 2d6ac0e..bb63b16 100644 --- a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj +++ b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj @@ -18,7 +18,7 @@ MIT Git capnp "Cap'n Proto" RPC serialization cerealization - 1.3-local$([System.DateTime]::UtcNow.ToString(yyMMddHHmm)) + 1.2-local$([System.DateTime]::UtcNow.ToString(yyMMddHHmm)) Debug;Release true diff --git a/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.csproj b/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.csproj index c2aa0e7..3f7d27f 100644 --- a/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.csproj +++ b/CapnpC.CSharp.MsBuild.Generation/CapnpC.CSharp.MsBuild.Generation.csproj @@ -9,9 +9,9 @@ true true $(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb - 1.3.0.0 - 1.3.0.0 - 1.3-local$([System.DateTime]::UtcNow.ToString(yyMMddHHmm)) + 1.2.0.0 + 1.2.0.0 + 1.2-local$([System.DateTime]::UtcNow.ToString(yyMMddHHmm)) $(MSBuildThisFileDirectory)CapnpC.CSharp.MsBuild.Generation.nuspec version=$(Version);configuration=$(Configuration) diff --git a/MsBuildGenerationTest/MsBuildGenerationTest.csproj b/MsBuildGenerationTest/MsBuildGenerationTest.csproj index c415422..3c7c246 100644 --- a/MsBuildGenerationTest/MsBuildGenerationTest.csproj +++ b/MsBuildGenerationTest/MsBuildGenerationTest.csproj @@ -3,7 +3,7 @@ Exe netcoreapp2.2 - 1.3-local + 1.2-local Debug;Release $(Version)* diff --git a/appveyor.yml b/appveyor.yml index bfbe4fb..b18d96d 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,4 +1,4 @@ -version: '1.3.{build}' +version: '1.2.{build}' image: Visual Studio 2019 # branches: # only: diff --git a/capnpc-csharp/capnpc-csharp.csproj b/capnpc-csharp/capnpc-csharp.csproj index f5088f0..373c3a3 100644 --- a/capnpc-csharp/capnpc-csharp.csproj +++ b/capnpc-csharp/capnpc-csharp.csproj @@ -13,7 +13,7 @@ https://github.com/c80k/capnproto-dotnetcore Git capnp capnpc RPC serialization cerealization - 1.3.0 + 1.2.0 Debug;Release From 1ec5b0e955df435905b5397e5c08920af55aa98e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 1 Feb 2020 13:49:45 +0100 Subject: [PATCH 4/4] Improvements w.r.t. issue #37 --- Capnp.Net.Runtime.Tests/TcpRpcStress.cs | 11 ++++++++++- Capnp.Net.Runtime/Framing.cs | 3 ++- Capnp.Net.Runtime/Rpc/IConnection.cs | 7 +++++++ Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 7 +++++++ Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 7 +++++++ 5 files changed, 33 insertions(+), 2 deletions(-) diff --git a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs index 6ca2c14..8409ace 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging; using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Collections.Generic; +using System.Net; using System.Text; using System.Threading; @@ -94,9 +95,17 @@ namespace Capnp.Net.Runtime.Tests public void ScatteredTransfer() { - using (var server = SetupServer()) + using (var server = new TcpRpcServer(IPAddress.Any, TcpPort)) using (var client = new TcpRpcClient()) { + server.OnConnectionChanged += (_, e) => + { + if (e.Connection.State == ConnectionState.Initializing) + { + e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7)); + } + }; + client.InjectMidlayer(s => new ScatteringStream(s, 10)); client.Connect("localhost", TcpPort); client.WhenConnected.Wait(); diff --git a/Capnp.Net.Runtime/Framing.cs b/Capnp.Net.Runtime/Framing.cs index bba2875..8e39cc5 100644 --- a/Capnp.Net.Runtime/Framing.cs +++ b/Capnp.Net.Runtime/Framing.cs @@ -110,13 +110,14 @@ namespace Capnp #else var buffer = MemoryMarshal.Cast(buffers[i].Span); - while (buffer.Length > 0) + do { int obtained = reader.Read(buffer); if (obtained == 0) throw StreamClosed(); buffer = buffer.Slice(obtained); } + while (buffer.Length > 0); #endif } } diff --git a/Capnp.Net.Runtime/Rpc/IConnection.cs b/Capnp.Net.Runtime/Rpc/IConnection.cs index 3a15f6f..b18fc5c 100644 --- a/Capnp.Net.Runtime/Rpc/IConnection.cs +++ b/Capnp.Net.Runtime/Rpc/IConnection.cs @@ -52,6 +52,13 @@ namespace Capnp.Rpc /// Connection is not in state 'Initializing' void AttachTracer(IFrameTracer tracer); + /// + /// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. + /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received + /// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms. + /// + /// Callback for wrapping the midlayer around its underlying stream + /// is null void InjectMidlayer(Func createFunc); /// diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index 293f9a5..def2f78 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -222,6 +222,13 @@ namespace Capnp.Rpc }; } + /// + /// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. + /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received + /// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms. + /// + /// Callback for wrapping the midlayer around its underlying stream + /// is null public void InjectMidlayer(Func createFunc) { if (createFunc == null) diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index f3c0553..131f0ce 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -128,6 +128,13 @@ namespace Capnp.Rpc Pump.AttachTracer(tracer); } + /// + /// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. + /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received + /// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms. + /// + /// Callback for wrapping the midlayer around its underlying stream + /// is null public void InjectMidlayer(Func createFunc) { if (createFunc == null)