fwd integration from master

This commit is contained in:
Christian Köllner 2020-02-01 13:56:23 +01:00
commit d07a9c51eb
9 changed files with 190 additions and 16 deletions

View File

@ -22,6 +22,7 @@
<Compile Include="..\Capnp.Net.Runtime.Tests\ProvidedCapabilityMock.cs" Link="ProvidedCapabilityMock.cs" /> <Compile Include="..\Capnp.Net.Runtime.Tests\ProvidedCapabilityMock.cs" Link="ProvidedCapabilityMock.cs" />
<Compile Include="..\Capnp.Net.Runtime.Tests\ProvidedCapabilityMultiCallMock.cs" Link="ProvidedCapabilityMultiCallMock.cs" /> <Compile Include="..\Capnp.Net.Runtime.Tests\ProvidedCapabilityMultiCallMock.cs" Link="ProvidedCapabilityMultiCallMock.cs" />
<Compile Include="..\Capnp.Net.Runtime.Tests\RpcSchemaTests.cs" Link="RpcSchemaTests.cs" /> <Compile Include="..\Capnp.Net.Runtime.Tests\RpcSchemaTests.cs" Link="RpcSchemaTests.cs" />
<Compile Include="..\Capnp.Net.Runtime.Tests\ScatteringStream.cs" Link="ScatteringStream.cs" />
<Compile Include="..\Capnp.Net.Runtime.Tests\SegmentAllocatorTests.cs" Link="SegmentAllocatorTests.cs" /> <Compile Include="..\Capnp.Net.Runtime.Tests\SegmentAllocatorTests.cs" Link="SegmentAllocatorTests.cs" />
<Compile Include="..\Capnp.Net.Runtime.Tests\TcpRpc.cs" Link="TcpRpc.cs" /> <Compile Include="..\Capnp.Net.Runtime.Tests\TcpRpc.cs" Link="TcpRpc.cs" />
<Compile Include="..\Capnp.Net.Runtime.Tests\TcpRpcAdvancedStuff.cs" Link="TcpRpcAdvancedStuff.cs" /> <Compile Include="..\Capnp.Net.Runtime.Tests\TcpRpcAdvancedStuff.cs" Link="TcpRpcAdvancedStuff.cs" />

View File

@ -0,0 +1,58 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Capnp.Net.Runtime.Tests
{
/// <summary>
/// Imitates the behavior of a TCP connection by real hardware, which splits data transfer into multiple packets.
/// </summary>
class ScatteringStream : Stream
{
readonly Stream _baseStream;
readonly int _mtu;
public ScatteringStream(Stream baseStream, int mtu)
{
_baseStream = baseStream;
_mtu = mtu;
}
public override bool CanRead => _baseStream.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => _baseStream.CanWrite;
public override long Length => _baseStream.Length;
public override long Position
{
get => _baseStream.Position;
set => throw new NotImplementedException();
}
public override void Flush() => _baseStream.Flush();
public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
public override void SetLength(long value) => throw new NotImplementedException();
public override void Write(byte[] buffer, int offset, int count)
{
while (count > 0)
{
int amount = Math.Min(count, _mtu);
_baseStream.Write(buffer, offset, amount);
_baseStream.Flush();
offset += amount;
count -= amount;
}
}
}
}

View File

@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.TestTools.UnitTesting; using Microsoft.VisualStudio.TestTools.UnitTesting;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Net;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
@ -89,5 +90,48 @@ namespace Capnp.Net.Runtime.Tests
var t = new TcpRpc(); var t = new TcpRpc();
Repeat(100, t.PipelineAfterReturn); Repeat(100, t.PipelineAfterReturn);
} }
[TestMethod]
public void ScatteredTransfer()
{
using (var server = new TcpRpcServer(IPAddress.Any, TcpPort))
using (var client = new TcpRpcClient())
{
server.OnConnectionChanged += (_, e) =>
{
if (e.Connection.State == ConnectionState.Initializing)
{
e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7));
}
};
client.InjectMidlayer(s => new ScatteringStream(s, 10));
client.Connect("localhost", TcpPort);
client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
using (var main = client.GetMain<ITestInterface>())
{
for (int i = 0; i < 100; i++)
{
var request1 = main.Foo(123, true, default);
var request3 = Assert.ThrowsExceptionAsync<RpcException>(() => main.Bar(default));
var s = new TestAllTypes();
Common.InitTestMessage(s);
var request2 = main.Baz(s, default);
Assert.IsTrue(request1.Wait(MediumNonDbgTimeout));
Assert.IsTrue(request2.Wait(MediumNonDbgTimeout));
Assert.IsTrue(request3.Wait(MediumNonDbgTimeout));
Assert.AreEqual("foo", request1.Result);
Assert.AreEqual(2, counters.CallCount);
counters.CallCount = 0;
}
}
}
}
} }
} }

View File

@ -79,7 +79,10 @@ namespace Capnp
return new WireFrame(buffers); return new WireFrame(buffers);
} }
static InvalidDataException StreamClosed()
=> new InvalidDataException("Prematurely reached end of stream. Expected more bytes according to framing header.");
static void FillBuffersFromFrames(Memory<ulong>[] buffers, uint segmentCount, BinaryReader reader) static void FillBuffersFromFrames(Memory<ulong>[] buffers, uint segmentCount, BinaryReader reader)
{ {
for (uint i = 0; i < segmentCount; i++) for (uint i = 0; i < segmentCount; i++)
@ -90,7 +93,12 @@ namespace Capnp
if (tmpBuffer.Length != buffer.Length) if (tmpBuffer.Length != buffer.Length)
{ {
throw new InvalidDataException("Expected more bytes according to framing header"); // Note w.r.t. issue #37: If there are temporarily less bytes available,
// this will NOT cause ReadBytes to return a shorter buffer.
// Only if the end of the stream is reached will we enter this branch. And this will be an error condition,
// since it would mean that the connection was closed in the middle of a frame transfer.
throw StreamClosed();
} }
// Fastest way to do this without /unsafe // Fastest way to do this without /unsafe
@ -101,7 +109,15 @@ namespace Capnp
} }
#else #else
var buffer = MemoryMarshal.Cast<ulong, byte>(buffers[i].Span); var buffer = MemoryMarshal.Cast<ulong, byte>(buffers[i].Span);
reader.Read(buffer);
do
{
int obtained = reader.Read(buffer);
if (obtained == 0)
throw StreamClosed();
buffer = buffer.Slice(obtained);
}
while (buffer.Length > 0);
#endif #endif
} }
} }

View File

@ -1,5 +1,6 @@
using Capnp.FrameTracing; using Capnp.FrameTracing;
using System; using System;
using System.IO;
namespace Capnp.Rpc namespace Capnp.Rpc
{ {
@ -51,6 +52,15 @@ namespace Capnp.Rpc
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception> /// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
void AttachTracer(IFrameTracer tracer); void AttachTracer(IFrameTracer tracer);
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
void InjectMidlayer(Func<Stream, Stream> createFunc);
/// <summary> /// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case
/// of this method is to refuse an incoming connection in the <code>TcpRpcServer.OnConnectionChanged</code> callback. /// of this method is to refuse an incoming connection in the <code>TcpRpcServer.OnConnectionChanged</code> callback.

View File

@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
@ -44,6 +45,7 @@ namespace Capnp.Rpc
readonly RpcEngine _rpcEngine; readonly RpcEngine _rpcEngine;
readonly TcpClient _client; readonly TcpClient _client;
Func<Stream, Stream> _createLayers = _ => _;
RpcEngine.RpcEndpoint _inboundEndpoint; RpcEngine.RpcEndpoint _inboundEndpoint;
OutboundTcpEndpoint _outboundEndpoint; OutboundTcpEndpoint _outboundEndpoint;
FramePump _pump; FramePump _pump;
@ -82,7 +84,9 @@ namespace Capnp.Rpc
await ConnectAsync(host, port); await ConnectAsync(host, port);
State = ConnectionState.Active; State = ConnectionState.Active;
_pump = new FramePump(_client.GetStream());
var stream = _createLayers(_client.GetStream());
_pump = new FramePump(stream);
_attachTracerAction?.Invoke(); _attachTracerAction?.Invoke();
_outboundEndpoint = new OutboundTcpEndpoint(this, _pump); _outboundEndpoint = new OutboundTcpEndpoint(this, _pump);
_inboundEndpoint = _rpcEngine.AddEndpoint(_outboundEndpoint); _inboundEndpoint = _rpcEngine.AddEndpoint(_outboundEndpoint);
@ -218,6 +222,25 @@ namespace Capnp.Rpc
}; };
} }
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
public void InjectMidlayer(Func<Stream, Stream> createFunc)
{
if (createFunc == null)
throw new ArgumentNullException(nameof(createFunc));
if (State != ConnectionState.Initializing)
throw new InvalidOperationException("Connection is not in state 'Initializing'");
var last = _createLayers;
_createLayers = _ => createFunc(last(_));
}
/// <summary> /// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually. /// Prematurely closes this connection. Note that there is usually no need to close a connection manually.
/// </summary> /// </summary>

View File

@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
@ -60,24 +61,29 @@ namespace Capnp.Rpc
class Connection: IConnection class Connection: IConnection
{ {
readonly TcpRpcServer _server; readonly TcpRpcServer _server;
Stream _stream;
public Connection(TcpRpcServer server, TcpClient client, FramePump pump, OutboundTcpEndpoint outboundEp, RpcEngine.RpcEndpoint inboundEp) public Connection(TcpRpcServer server, TcpClient client)
{ {
_server = server; _server = server;
Client = client; Client = client;
Pump = pump; _stream = client.GetStream();
OutboundEp = outboundEp;
InboundEp = inboundEp;
} }
public void Start() public void Start()
{ {
Pump = new FramePump(_stream);
OutboundEp = new OutboundTcpEndpoint(_server, Pump);
InboundEp = _server._rpcEngine.AddEndpoint(OutboundEp);
Pump.FrameReceived += InboundEp.Forward;
State = ConnectionState.Active;
PumpRunner = new Thread(o => PumpRunner = new Thread(o =>
{ {
try try
{ {
Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}"; Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}";
State = ConnectionState.Active;
Pump.Run(); Pump.Run();
} }
@ -122,6 +128,24 @@ namespace Capnp.Rpc
Pump.AttachTracer(tracer); Pump.AttachTracer(tracer);
} }
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
public void InjectMidlayer(Func<Stream, Stream> createFunc)
{
if (createFunc == null)
throw new ArgumentNullException(nameof(createFunc));
if (State != ConnectionState.Initializing)
throw new InvalidOperationException("Connection is not in state 'Initializing'");
_stream = createFunc(_stream);
}
public void Close() public void Close()
{ {
Client.Dispose(); Client.Dispose();
@ -149,12 +173,7 @@ namespace Capnp.Rpc
while (true) while (true)
{ {
var client = _listener.AcceptTcpClient(); var client = _listener.AcceptTcpClient();
var pump = new FramePump(client.GetStream()); var connection = new Connection(this, client);
var outboundEndpoint = new OutboundTcpEndpoint(this, pump);
var inboundEndpoint = _rpcEngine.AddEndpoint(outboundEndpoint);
pump.FrameReceived += inboundEndpoint.Forward;
var connection = new Connection(this, client, pump, outboundEndpoint, inboundEndpoint);
lock (_reentrancyBlocker) lock (_reentrancyBlocker)
{ {

View File

@ -3,7 +3,7 @@
<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType> <OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework> <TargetFramework>netcoreapp2.2</TargetFramework>
<Version>1.3-local</Version> <Version>1.2-local</Version>
<Configurations>Debug;Release</Configurations> <Configurations>Debug;Release</Configurations>
<PackageReferenceVersion Condition="'$(PackageReferenceVersion)'==''">$(Version)*</PackageReferenceVersion> <PackageReferenceVersion Condition="'$(PackageReferenceVersion)'==''">$(Version)*</PackageReferenceVersion>
</PropertyGroup> </PropertyGroup>

View File

@ -43,6 +43,9 @@ choco install capnpc-csharp-win-x86
Both versions will also download and install the [Cap'n Proto tool set Chocolatey package](https://www.chocolatey.org/packages/capnproto). Note that the author does not maintain this package and has no influence on its contents. Both versions will also download and install the [Cap'n Proto tool set Chocolatey package](https://www.chocolatey.org/packages/capnproto). Note that the author does not maintain this package and has no influence on its contents.
*Announcement: There is currently an experimental branch for packaging the code generator back end as .NET Core CLI tool. If this approach turns out to be viable, it will probably be superior to the Chocolatey deployment. In that case, the CLI tool capnpc-csharp will be deployed on NuGet.org and the former Chocolatey package will be deprecated. capnpc-csharp-win-x86 will probably be kept (and also maintained).*
### Code generator back end: Other OSes ### Code generator back end: Other OSes
Currently, you are on yourself. Compile the `capnpc-csharp` VS project and install the resulting .NET application manually on your system. This should not be that complicated, see also the [Wiki](https://github.com/c80k/capnproto-dotnetcore/wiki). It would be great to support other package managers, especially [APT](https://wiki.debian.org/Apt). Consider contributing? Author would be happy! Currently, you are on yourself. Compile the `capnpc-csharp` VS project and install the resulting .NET application manually on your system. This should not be that complicated, see also the [Wiki](https://github.com/c80k/capnproto-dotnetcore/wiki). It would be great to support other package managers, especially [APT](https://wiki.debian.org/Apt). Consider contributing? Author would be happy!