From 4bcc97a69fc67ddbbca3ae8c5b6f3c3d7ecd46c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Fri, 31 Jan 2020 21:40:25 +0100 Subject: [PATCH] Fix + test case for issue #37 --- .../Capnp.Net.Runtime.Tests.Core21.csproj | 1 + Capnp.Net.Runtime.Tests/ScatteringStream.cs | 58 +++++++++++++++++++ Capnp.Net.Runtime.Tests/TcpRpcStress.cs | 35 +++++++++++ Capnp.Net.Runtime/Framing.cs | 21 ++++++- Capnp.Net.Runtime/Rpc/IConnection.cs | 3 + Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 18 +++++- Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 34 +++++++---- 7 files changed, 155 insertions(+), 15 deletions(-) create mode 100644 Capnp.Net.Runtime.Tests/ScatteringStream.cs diff --git a/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj b/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj index 1541a85..30d40e2 100644 --- a/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj +++ b/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj @@ -22,6 +22,7 @@ + diff --git a/Capnp.Net.Runtime.Tests/ScatteringStream.cs b/Capnp.Net.Runtime.Tests/ScatteringStream.cs new file mode 100644 index 0000000..e6b44df --- /dev/null +++ b/Capnp.Net.Runtime.Tests/ScatteringStream.cs @@ -0,0 +1,58 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Capnp.Net.Runtime.Tests +{ + /// + /// Imitates the behavior of a TCP connection by real hardware, which splits data transfer into multiple packets. + /// + class ScatteringStream : Stream + { + readonly Stream _baseStream; + readonly int _mtu; + + public ScatteringStream(Stream baseStream, int mtu) + { + _baseStream = baseStream; + _mtu = mtu; + } + + public override bool CanRead => _baseStream.CanRead; + + public override bool CanSeek => false; + + public override bool CanWrite => _baseStream.CanWrite; + + public override long Length => _baseStream.Length; + + public override long Position + { + get => _baseStream.Position; + set => throw new NotImplementedException(); + } + + public override void Flush() => _baseStream.Flush(); + + public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count); + + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + + public override void SetLength(long value) => throw new NotImplementedException(); + + public override void Write(byte[] buffer, int offset, int count) + { + while (count > 0) + { + int amount = Math.Min(count, _mtu); + _baseStream.Write(buffer, offset, amount); + _baseStream.Flush(); + offset += amount; + count -= amount; + } + } + } +} diff --git a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs index d45c45c..6ca2c14 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs @@ -89,5 +89,40 @@ namespace Capnp.Net.Runtime.Tests var t = new TcpRpc(); Repeat(100, t.PipelineAfterReturn); } + + [TestMethod] + public void ScatteredTransfer() + { + + using (var server = SetupServer()) + using (var client = new TcpRpcClient()) + { + client.InjectMidlayer(s => new ScatteringStream(s, 10)); + client.Connect("localhost", TcpPort); + client.WhenConnected.Wait(); + + var counters = new Counters(); + server.Main = new TestInterfaceImpl(counters); + using (var main = client.GetMain()) + { + for (int i = 0; i < 100; i++) + { + var request1 = main.Foo(123, true, default); + var request3 = Assert.ThrowsExceptionAsync(() => main.Bar(default)); + var s = new TestAllTypes(); + Common.InitTestMessage(s); + var request2 = main.Baz(s, default); + + Assert.IsTrue(request1.Wait(MediumNonDbgTimeout)); + Assert.IsTrue(request2.Wait(MediumNonDbgTimeout)); + Assert.IsTrue(request3.Wait(MediumNonDbgTimeout)); + + Assert.AreEqual("foo", request1.Result); + Assert.AreEqual(2, counters.CallCount); + counters.CallCount = 0; + } + } + } + } } } diff --git a/Capnp.Net.Runtime/Framing.cs b/Capnp.Net.Runtime/Framing.cs index e9707ce..bba2875 100644 --- a/Capnp.Net.Runtime/Framing.cs +++ b/Capnp.Net.Runtime/Framing.cs @@ -79,7 +79,10 @@ namespace Capnp return new WireFrame(buffers); } - + + static InvalidDataException StreamClosed() + => new InvalidDataException("Prematurely reached end of stream. Expected more bytes according to framing header."); + static void FillBuffersFromFrames(Memory[] buffers, uint segmentCount, BinaryReader reader) { for (uint i = 0; i < segmentCount; i++) @@ -90,7 +93,12 @@ namespace Capnp if (tmpBuffer.Length != buffer.Length) { - throw new InvalidDataException("Expected more bytes according to framing header"); + // Note w.r.t. issue #37: If there are temporarily less bytes available, + // this will NOT cause ReadBytes to return a shorter buffer. + // Only if the end of the stream is reached will we enter this branch. And this will be an error condition, + // since it would mean that the connection was closed in the middle of a frame transfer. + + throw StreamClosed(); } // Fastest way to do this without /unsafe @@ -101,7 +109,14 @@ namespace Capnp } #else var buffer = MemoryMarshal.Cast(buffers[i].Span); - reader.Read(buffer); + + while (buffer.Length > 0) + { + int obtained = reader.Read(buffer); + if (obtained == 0) + throw StreamClosed(); + buffer = buffer.Slice(obtained); + } #endif } } diff --git a/Capnp.Net.Runtime/Rpc/IConnection.cs b/Capnp.Net.Runtime/Rpc/IConnection.cs index 582b17e..3a15f6f 100644 --- a/Capnp.Net.Runtime/Rpc/IConnection.cs +++ b/Capnp.Net.Runtime/Rpc/IConnection.cs @@ -1,5 +1,6 @@ using Capnp.FrameTracing; using System; +using System.IO; namespace Capnp.Rpc { @@ -51,6 +52,8 @@ namespace Capnp.Rpc /// Connection is not in state 'Initializing' void AttachTracer(IFrameTracer tracer); + void InjectMidlayer(Func createFunc); + /// /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case /// of this method is to refuse an incoming connection in the TcpRpcServer.OnConnectionChanged callback. diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index 0f3e1a6..293f9a5 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.CompilerServices; @@ -44,6 +45,7 @@ namespace Capnp.Rpc readonly RpcEngine _rpcEngine; readonly TcpClient _client; + Func _createLayers = _ => _; RpcEngine.RpcEndpoint _inboundEndpoint; OutboundTcpEndpoint _outboundEndpoint; FramePump _pump; @@ -82,7 +84,9 @@ namespace Capnp.Rpc await ConnectAsync(host, port); State = ConnectionState.Active; - _pump = new FramePump(_client.GetStream()); + + var stream = _createLayers(_client.GetStream()); + _pump = new FramePump(stream); _attachTracerAction?.Invoke(); _outboundEndpoint = new OutboundTcpEndpoint(this, _pump); _inboundEndpoint = _rpcEngine.AddEndpoint(_outboundEndpoint); @@ -218,6 +222,18 @@ namespace Capnp.Rpc }; } + public void InjectMidlayer(Func createFunc) + { + if (createFunc == null) + throw new ArgumentNullException(nameof(createFunc)); + + if (State != ConnectionState.Initializing) + throw new InvalidOperationException("Connection is not in state 'Initializing'"); + + var last = _createLayers; + _createLayers = _ => createFunc(last(_)); + } + /// /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. /// diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 6ac5cb1..f3c0553 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; @@ -60,24 +61,29 @@ namespace Capnp.Rpc class Connection: IConnection { readonly TcpRpcServer _server; + Stream _stream; - public Connection(TcpRpcServer server, TcpClient client, FramePump pump, OutboundTcpEndpoint outboundEp, RpcEngine.RpcEndpoint inboundEp) + public Connection(TcpRpcServer server, TcpClient client) { _server = server; Client = client; - Pump = pump; - OutboundEp = outboundEp; - InboundEp = inboundEp; + _stream = client.GetStream(); } public void Start() { + Pump = new FramePump(_stream); + OutboundEp = new OutboundTcpEndpoint(_server, Pump); + InboundEp = _server._rpcEngine.AddEndpoint(OutboundEp); + Pump.FrameReceived += InboundEp.Forward; + + State = ConnectionState.Active; + PumpRunner = new Thread(o => { try { Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}"; - State = ConnectionState.Active; Pump.Run(); } @@ -122,6 +128,17 @@ namespace Capnp.Rpc Pump.AttachTracer(tracer); } + public void InjectMidlayer(Func createFunc) + { + if (createFunc == null) + throw new ArgumentNullException(nameof(createFunc)); + + if (State != ConnectionState.Initializing) + throw new InvalidOperationException("Connection is not in state 'Initializing'"); + + _stream = createFunc(_stream); + } + public void Close() { Client.Dispose(); @@ -149,12 +166,7 @@ namespace Capnp.Rpc while (true) { var client = _listener.AcceptTcpClient(); - var pump = new FramePump(client.GetStream()); - var outboundEndpoint = new OutboundTcpEndpoint(this, pump); - var inboundEndpoint = _rpcEngine.AddEndpoint(outboundEndpoint); - pump.FrameReceived += inboundEndpoint.Forward; - - var connection = new Connection(this, client, pump, outboundEndpoint, inboundEndpoint); + var connection = new Connection(this, client); lock (_reentrancyBlocker) {