diff --git a/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj b/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj
index 1541a85..30d40e2 100644
--- a/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj
+++ b/Capnp.Net.Runtime.Tests.Core21/Capnp.Net.Runtime.Tests.Core21.csproj
@@ -22,6 +22,7 @@
+
diff --git a/Capnp.Net.Runtime.Tests/ScatteringStream.cs b/Capnp.Net.Runtime.Tests/ScatteringStream.cs
new file mode 100644
index 0000000..e6b44df
--- /dev/null
+++ b/Capnp.Net.Runtime.Tests/ScatteringStream.cs
@@ -0,0 +1,58 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Capnp.Net.Runtime.Tests
+{
+ ///
+ /// Imitates the behavior of a TCP connection by real hardware, which splits data transfer into multiple packets.
+ ///
+ class ScatteringStream : Stream
+ {
+ readonly Stream _baseStream;
+ readonly int _mtu;
+
+ public ScatteringStream(Stream baseStream, int mtu)
+ {
+ _baseStream = baseStream;
+ _mtu = mtu;
+ }
+
+ public override bool CanRead => _baseStream.CanRead;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => _baseStream.CanWrite;
+
+ public override long Length => _baseStream.Length;
+
+ public override long Position
+ {
+ get => _baseStream.Position;
+ set => throw new NotImplementedException();
+ }
+
+ public override void Flush() => _baseStream.Flush();
+
+ public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
+
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
+
+ public override void SetLength(long value) => throw new NotImplementedException();
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ while (count > 0)
+ {
+ int amount = Math.Min(count, _mtu);
+ _baseStream.Write(buffer, offset, amount);
+ _baseStream.Flush();
+ offset += amount;
+ count -= amount;
+ }
+ }
+ }
+}
diff --git a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs
index d45c45c..6ca2c14 100644
--- a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs
+++ b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs
@@ -89,5 +89,40 @@ namespace Capnp.Net.Runtime.Tests
var t = new TcpRpc();
Repeat(100, t.PipelineAfterReturn);
}
+
+ [TestMethod]
+ public void ScatteredTransfer()
+ {
+
+ using (var server = SetupServer())
+ using (var client = new TcpRpcClient())
+ {
+ client.InjectMidlayer(s => new ScatteringStream(s, 10));
+ client.Connect("localhost", TcpPort);
+ client.WhenConnected.Wait();
+
+ var counters = new Counters();
+ server.Main = new TestInterfaceImpl(counters);
+ using (var main = client.GetMain())
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ var request1 = main.Foo(123, true, default);
+ var request3 = Assert.ThrowsExceptionAsync(() => main.Bar(default));
+ var s = new TestAllTypes();
+ Common.InitTestMessage(s);
+ var request2 = main.Baz(s, default);
+
+ Assert.IsTrue(request1.Wait(MediumNonDbgTimeout));
+ Assert.IsTrue(request2.Wait(MediumNonDbgTimeout));
+ Assert.IsTrue(request3.Wait(MediumNonDbgTimeout));
+
+ Assert.AreEqual("foo", request1.Result);
+ Assert.AreEqual(2, counters.CallCount);
+ counters.CallCount = 0;
+ }
+ }
+ }
+ }
}
}
diff --git a/Capnp.Net.Runtime/Framing.cs b/Capnp.Net.Runtime/Framing.cs
index e9707ce..bba2875 100644
--- a/Capnp.Net.Runtime/Framing.cs
+++ b/Capnp.Net.Runtime/Framing.cs
@@ -79,7 +79,10 @@ namespace Capnp
return new WireFrame(buffers);
}
-
+
+ static InvalidDataException StreamClosed()
+ => new InvalidDataException("Prematurely reached end of stream. Expected more bytes according to framing header.");
+
static void FillBuffersFromFrames(Memory[] buffers, uint segmentCount, BinaryReader reader)
{
for (uint i = 0; i < segmentCount; i++)
@@ -90,7 +93,12 @@ namespace Capnp
if (tmpBuffer.Length != buffer.Length)
{
- throw new InvalidDataException("Expected more bytes according to framing header");
+ // Note w.r.t. issue #37: If there are temporarily less bytes available,
+ // this will NOT cause ReadBytes to return a shorter buffer.
+ // Only if the end of the stream is reached will we enter this branch. And this will be an error condition,
+ // since it would mean that the connection was closed in the middle of a frame transfer.
+
+ throw StreamClosed();
}
// Fastest way to do this without /unsafe
@@ -101,7 +109,14 @@ namespace Capnp
}
#else
var buffer = MemoryMarshal.Cast(buffers[i].Span);
- reader.Read(buffer);
+
+ while (buffer.Length > 0)
+ {
+ int obtained = reader.Read(buffer);
+ if (obtained == 0)
+ throw StreamClosed();
+ buffer = buffer.Slice(obtained);
+ }
#endif
}
}
diff --git a/Capnp.Net.Runtime/Rpc/IConnection.cs b/Capnp.Net.Runtime/Rpc/IConnection.cs
index 582b17e..3a15f6f 100644
--- a/Capnp.Net.Runtime/Rpc/IConnection.cs
+++ b/Capnp.Net.Runtime/Rpc/IConnection.cs
@@ -1,5 +1,6 @@
using Capnp.FrameTracing;
using System;
+using System.IO;
namespace Capnp.Rpc
{
@@ -51,6 +52,8 @@ namespace Capnp.Rpc
/// Connection is not in state 'Initializing'
void AttachTracer(IFrameTracer tracer);
+ void InjectMidlayer(Func createFunc);
+
///
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case
/// of this method is to refuse an incoming connection in the TcpRpcServer.OnConnectionChanged
callback.
diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs
index 0f3e1a6..293f9a5 100644
--- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs
+++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
@@ -44,6 +45,7 @@ namespace Capnp.Rpc
readonly RpcEngine _rpcEngine;
readonly TcpClient _client;
+ Func _createLayers = _ => _;
RpcEngine.RpcEndpoint _inboundEndpoint;
OutboundTcpEndpoint _outboundEndpoint;
FramePump _pump;
@@ -82,7 +84,9 @@ namespace Capnp.Rpc
await ConnectAsync(host, port);
State = ConnectionState.Active;
- _pump = new FramePump(_client.GetStream());
+
+ var stream = _createLayers(_client.GetStream());
+ _pump = new FramePump(stream);
_attachTracerAction?.Invoke();
_outboundEndpoint = new OutboundTcpEndpoint(this, _pump);
_inboundEndpoint = _rpcEngine.AddEndpoint(_outboundEndpoint);
@@ -218,6 +222,18 @@ namespace Capnp.Rpc
};
}
+ public void InjectMidlayer(Func createFunc)
+ {
+ if (createFunc == null)
+ throw new ArgumentNullException(nameof(createFunc));
+
+ if (State != ConnectionState.Initializing)
+ throw new InvalidOperationException("Connection is not in state 'Initializing'");
+
+ var last = _createLayers;
+ _createLayers = _ => createFunc(last(_));
+ }
+
///
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually.
///
diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs
index 6ac5cb1..f3c0553 100644
--- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs
+++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs
@@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
+using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
@@ -60,24 +61,29 @@ namespace Capnp.Rpc
class Connection: IConnection
{
readonly TcpRpcServer _server;
+ Stream _stream;
- public Connection(TcpRpcServer server, TcpClient client, FramePump pump, OutboundTcpEndpoint outboundEp, RpcEngine.RpcEndpoint inboundEp)
+ public Connection(TcpRpcServer server, TcpClient client)
{
_server = server;
Client = client;
- Pump = pump;
- OutboundEp = outboundEp;
- InboundEp = inboundEp;
+ _stream = client.GetStream();
}
public void Start()
{
+ Pump = new FramePump(_stream);
+ OutboundEp = new OutboundTcpEndpoint(_server, Pump);
+ InboundEp = _server._rpcEngine.AddEndpoint(OutboundEp);
+ Pump.FrameReceived += InboundEp.Forward;
+
+ State = ConnectionState.Active;
+
PumpRunner = new Thread(o =>
{
try
{
Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}";
- State = ConnectionState.Active;
Pump.Run();
}
@@ -122,6 +128,17 @@ namespace Capnp.Rpc
Pump.AttachTracer(tracer);
}
+ public void InjectMidlayer(Func createFunc)
+ {
+ if (createFunc == null)
+ throw new ArgumentNullException(nameof(createFunc));
+
+ if (State != ConnectionState.Initializing)
+ throw new InvalidOperationException("Connection is not in state 'Initializing'");
+
+ _stream = createFunc(_stream);
+ }
+
public void Close()
{
Client.Dispose();
@@ -149,12 +166,7 @@ namespace Capnp.Rpc
while (true)
{
var client = _listener.AcceptTcpClient();
- var pump = new FramePump(client.GetStream());
- var outboundEndpoint = new OutboundTcpEndpoint(this, pump);
- var inboundEndpoint = _rpcEngine.AddEndpoint(outboundEndpoint);
- pump.FrameReceived += inboundEndpoint.Forward;
-
- var connection = new Connection(this, client, pump, outboundEndpoint, inboundEndpoint);
+ var connection = new Connection(this, client);
lock (_reentrancyBlocker)
{