This commit is contained in:
Christian Köllner 2020-03-10 21:55:29 +01:00
commit 281a1c868f
32 changed files with 542 additions and 217 deletions

View File

@ -7,7 +7,7 @@
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.12.0" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.2.189" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.34-g409e517587" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
<PackageReference Include="Google.Protobuf" Version="3.11.3" />
<PackageReference Include="Grpc.Net.Client" Version="2.27.0" />

View File

@ -13,6 +13,9 @@ namespace Benchmark
[Params(20, 200, 2000, 20000, 200000, 2000000)]
public int PayloadBytes;
[Params(0, 256, 1024, 4096)]
public int BufferSize;
TcpRpcClient _client;
IEchoer _echoer;
byte[] _payload;
@ -21,6 +24,8 @@ namespace Benchmark
public void Setup()
{
_client = new TcpRpcClient("localhost", 5002);
if (BufferSize > 0)
_client.AddBuffering(BufferSize);
_client.WhenConnected.Wait();
_echoer = _client.GetMain<IEchoer>();
_payload = new byte[PayloadBytes];

View File

@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29728.190
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CapnpProfile", "CapnpProfile\CapnpProfile.csproj", "{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {F5DDDA09-394B-4A71-B6BE-C7103BCEF3A2}
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.34-g409e517587" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.3.29-g6d711b8579" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,32 @@
using Capnp.Rpc;
using CapnpGen;
using CapnpProfile.Services;
using System;
using System.Net;
using System.Threading.Tasks;
namespace CapnpProfile
{
class Program
{
static async Task Main(string[] args)
{
using var server = new TcpRpcServer();
server.Main = new CapnpEchoService();
server.AddBuffering();
server.StartAccepting(IPAddress.Any, 5002);
using var client = new TcpRpcClient("localhost", 5002);
await client.WhenConnected;
using var echoer = client.GetMain<IEchoer>();
var payload = new byte[20];
new Random().NextBytes(payload);
while (true)
{
var result = await echoer.Echo(payload);
if (result.Count != payload.Length)
throw new InvalidOperationException("Echo server malfunction");
}
}
}
}

View File

@ -0,0 +1,5 @@
@0x8c309c720de8cf7c;
interface Echoer {
echo @0 (input : Data) -> (output : Data);
}

View File

@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CapnpProfile.Services
{
public class CapnpEchoService : CapnpGen.IEchoer
{
public void Dispose()
{
}
public Task<IReadOnlyList<byte>> Echo(IReadOnlyList<byte> input, CancellationToken cancellationToken_ = default)
{
return Task.FromResult(input);
}
}
}

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
@ -6,7 +6,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Capnp.Net.Runtime" Version="1.2.189" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.34-g409e517587" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
</ItemGroup>

View File

@ -9,9 +9,11 @@ namespace EchoServiceCapnp
{
static void Main(string[] args)
{
using (var server = new TcpRpcServer(IPAddress.Any, 5002))
using (var server = new TcpRpcServer())
{
server.AddBuffering();
server.Main = new CapnpEchoService();
server.StartAccepting(IPAddress.Any, 5002);
Console.WriteLine("Press RETURN to stop listening");
Console.ReadLine();
}

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" />
</packageSources>
</configuration>

View File

@ -36,7 +36,7 @@ namespace Capnp.Net.Runtime.Tests
Process _currentProcess;
void LaunchCompatTestProcess(string whichTest, Action<StreamReader> test)
bool TryLaunchCompatTestProcess(string whichTest, Action<StreamReader> test)
{
string myPath = Path.GetDirectoryName(typeof(TcpRpcInterop).Assembly.Location);
string config;
@ -71,6 +71,12 @@ namespace Capnp.Net.Runtime.Tests
"Problem after launching test process");
test(_currentProcess.StandardOutput);
return true;
}
catch (AssertFailedException)
{
return false;
}
finally
{
@ -85,6 +91,20 @@ namespace Capnp.Net.Runtime.Tests
}
}
void LaunchCompatTestProcess(string whichTest, Action<StreamReader> test)
{
for (int retry = 0; retry < 5; retry++)
{
if (TryLaunchCompatTestProcess(whichTest, test))
return;
if (whichTest.StartsWith("server:"))
PrepareNextTest();
}
Assert.Fail("Problem after launching test process");
}
void SendInput(string line)
{
_currentProcess.StandardInput.WriteLine(line);

View File

@ -98,14 +98,7 @@ namespace Capnp.Net.Runtime.Tests
using (var server = new TcpRpcServer(IPAddress.Any, TcpPort))
using (var client = new TcpRpcClient())
{
server.OnConnectionChanged += (_, e) =>
{
if (e.Connection.State == ConnectionState.Initializing)
{
e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7));
}
};
server.InjectMidlayer(s => new ScatteringStream(s, 7));
client.InjectMidlayer(s => new ScatteringStream(s, 10));
client.Connect("localhost", TcpPort);
client.WhenConnected.Wait();

View File

@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -20,8 +21,39 @@ namespace Capnp.Net.Runtime.Tests
protected ILogger Logger { get; set; }
protected TcpRpcClient SetupClient() => new TcpRpcClient("localhost", TcpPort);
protected TcpRpcServer SetupServer() => new TcpRpcServer(IPAddress.Any, TcpPort);
protected TcpRpcClient SetupClient()
{
var client = new TcpRpcClient();
client.AddBuffering();
client.Connect("localhost", TcpPort);
return client;
}
protected TcpRpcServer SetupServer()
{
int attempt = 0;
while (true)
{
try
{
var server = new TcpRpcServer(IPAddress.Any, TcpPort);
server.AddBuffering();
return server;
}
catch (SocketException)
{
// If the TCP listening port is occupied by some other process,
// retry with a different one
if (attempt == 5)
throw;
}
IncrementTcpPort();
++attempt;
}
}
protected (TcpRpcServer, TcpRpcClient) SetupClientServerPair()
{

View File

@ -32,6 +32,11 @@
<DefineConstants>DebugCapabilityLifecycle</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netstandard2.0|AnyCPU'">
<DebugType>portable</DebugType>
<DebugSymbols>true</DebugSymbols>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
</ItemGroup>

View File

@ -501,19 +501,38 @@ namespace Capnp
/// <exception cref="DeserializationException">state does not represent a list</exception>
public ListDeserializer RequireList()
{
return Kind switch
switch (Kind)
{
ObjectKind.ListOfBits => new ListOfBitsDeserializer(ref this, false),
ObjectKind.ListOfBytes => new ListOfPrimitivesDeserializer<byte>(ref this, ListKind.ListOfBytes),
ObjectKind.ListOfEmpty => new ListOfEmptyDeserializer(ref this),
ObjectKind.ListOfInts => new ListOfPrimitivesDeserializer<int>(ref this, ListKind.ListOfInts),
ObjectKind.ListOfLongs => new ListOfPrimitivesDeserializer<long>(ref this, ListKind.ListOfLongs),
ObjectKind.ListOfPointers => new ListOfPointersDeserializer(ref this),
ObjectKind.ListOfShorts => new ListOfPrimitivesDeserializer<short>(ref this, ListKind.ListOfShorts),
ObjectKind.ListOfStructs => new ListOfStructsDeserializer(ref this),
ObjectKind.Nil => new EmptyListDeserializer(),
_ => throw new DeserializationException("Cannot deserialize this object as list"),
};
case ObjectKind.ListOfBits:
return new ListOfBitsDeserializer(this, false);
case ObjectKind.ListOfBytes:
return new ListOfPrimitivesDeserializer<byte>(this, ListKind.ListOfBytes);
case ObjectKind.ListOfEmpty:
return new ListOfEmptyDeserializer(this);
case ObjectKind.ListOfInts:
return new ListOfPrimitivesDeserializer<int>(this, ListKind.ListOfInts);
case ObjectKind.ListOfLongs:
return new ListOfPrimitivesDeserializer<long>(this, ListKind.ListOfLongs);
case ObjectKind.ListOfPointers:
return new ListOfPointersDeserializer(this);
case ObjectKind.ListOfShorts:
return new ListOfPrimitivesDeserializer<short>(this, ListKind.ListOfShorts);
case ObjectKind.ListOfStructs:
return new ListOfStructsDeserializer(this);
case ObjectKind.Nil:
return new EmptyListDeserializer();
default:
throw new DeserializationException("Cannot deserialize this object as list");
}
}
/// <summary>
@ -523,11 +542,14 @@ namespace Capnp
/// <exception cref="DeserializationException">state does not represent a list of pointers</exception>
public ListOfCapsDeserializer<T> RequireCapList<T>() where T: class
{
return Kind switch
switch (Kind)
{
ObjectKind.ListOfPointers => new ListOfCapsDeserializer<T>(ref this),
_ => throw new DeserializationException("Cannot deserialize this object as capability list"),
};
case ObjectKind.ListOfPointers:
return new ListOfCapsDeserializer<T>(this);
default:
throw new DeserializationException("Cannot deserialize this object as capability list");
}
}
/// <summary>

View File

@ -116,6 +116,8 @@ namespace Capnp
#endif
_writer.Write(bytes);
}
_writer.Flush();
}
}

View File

@ -37,7 +37,7 @@ namespace Capnp
/// </summary>
protected readonly DeserializerState State;
internal ListDeserializer(ref DeserializerState state)
internal ListDeserializer(in DeserializerState state)
{
State = state;
}

View File

@ -11,8 +11,8 @@ namespace Capnp
{
readonly bool _defaultValue;
internal ListOfBitsDeserializer(ref DeserializerState context, bool defaultValue) :
base(ref context)
internal ListOfBitsDeserializer(in DeserializerState context, bool defaultValue) :
base(context)
{
_defaultValue = defaultValue;
}

View File

@ -11,7 +11,7 @@ namespace Capnp
public class ListOfCapsDeserializer<T> : ListDeserializer, IReadOnlyList<T>
where T: class
{
internal ListOfCapsDeserializer(ref DeserializerState state) : base(ref state)
internal ListOfCapsDeserializer(in DeserializerState state) : base(state)
{
Rpc.CapabilityReflection.ValidateCapabilityInterface(typeof(T));
}

View File

@ -10,8 +10,8 @@ namespace Capnp
/// </summary>
public class ListOfEmptyDeserializer : ListDeserializer, IReadOnlyList<DeserializerState>
{
internal ListOfEmptyDeserializer(ref DeserializerState state) :
base(ref state)
internal ListOfEmptyDeserializer(in DeserializerState state) :
base(state)
{
}

View File

@ -9,8 +9,8 @@ namespace Capnp
/// </summary>
public class ListOfPointersDeserializer: ListDeserializer, IReadOnlyList<DeserializerState>
{
internal ListOfPointersDeserializer(ref DeserializerState state) :
base(ref state)
internal ListOfPointersDeserializer(in DeserializerState state) :
base(state)
{
}

View File

@ -7,11 +7,11 @@ using System.Text;
namespace Capnp
{
/// <summary>
/// ListDeserializer specialization for List(Int*), List(UInt*), List(Float*), and List(Enum).
/// ListDeserializer specialization for unmanaged primitive types (including enum).
/// </summary>
/// <typeparam name="T">List element type</typeparam>
public class ListOfPrimitivesDeserializer<T>: ListDeserializer, IReadOnlyList<T>
where T: struct
where T: unmanaged
{
class ListOfULongAsStructView<U> : IReadOnlyList<U>
{
@ -73,8 +73,8 @@ namespace Capnp
readonly ListKind _kind;
internal ListOfPrimitivesDeserializer(ref DeserializerState state, ListKind kind) :
base(ref state)
internal ListOfPrimitivesDeserializer(in DeserializerState state, ListKind kind) :
base(state)
{
_kind = kind;
}
@ -84,7 +84,10 @@ namespace Capnp
/// </summary>
public override ListKind Kind => _kind;
ReadOnlySpan<T> Data => MemoryMarshal.Cast<ulong, T>(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count);
/// <summary>
/// Retrieves the underlying memory span of this object
/// </summary>
public ReadOnlySpan<T> Span => MemoryMarshal.Cast<ulong, T>(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count);
/// <summary>
/// Returns the element at given index.
@ -92,15 +95,14 @@ namespace Capnp
/// <param name="index">Element index</param>
/// <returns>Element value</returns>
/// <exception cref="IndexOutOfRangeException"><paramref name="index"/> is out of range.</exception>
public T this[int index] => Data[index];
public T this[int index] => Span[index];
ListOfPrimitivesDeserializer<U> PrimitiveCast<U>() where U: struct
ListOfPrimitivesDeserializer<U> PrimitiveCast<U>() where U: unmanaged
{
if (Marshal.SizeOf<U>() != Marshal.SizeOf<T>())
throw new NotSupportedException("Source and target types have different sizes, cannot cast");
var stateCopy = State;
return new ListOfPrimitivesDeserializer<U>(ref stateCopy, Kind);
return new ListOfPrimitivesDeserializer<U>(State, Kind);
}
/// <summary>
@ -203,7 +205,7 @@ namespace Capnp
/// <exception cref="NotSupportedException">Element size is different from 1 byte.</exception>
public override string CastText()
{
var utf8Bytes = PrimitiveCast<byte>().Data;
var utf8Bytes = PrimitiveCast<byte>().Span;
if (utf8Bytes.Length == 0) return string.Empty;
var utf8GytesNoZterm = utf8Bytes.Slice(0, utf8Bytes.Length - 1);
return Encoding.UTF8.GetString(utf8GytesNoZterm.ToArray());

View File

@ -6,13 +6,13 @@ using System.Runtime.InteropServices;
namespace Capnp
{
/// <summary>
/// SerializerState specialization for List(Int*), List(UInt*), List(Float*), and List(Enum).
/// SerializerState specialization for unmanaged primitive types (including enum).
/// </summary>
/// <typeparam name="T">List element type, must be primitive. Static constructor will throw if the type does not work.</typeparam>
public class ListOfPrimitivesSerializer<T> :
SerializerState,
IReadOnlyList<T>
where T : struct
where T : unmanaged
{
static readonly int ElementSize;
@ -89,16 +89,31 @@ namespace Capnp
Init(items.Count);
for (int i = 0; i < items.Count; i++)
switch (items)
{
this[i] = items[i];
}
}
case T[] array:
array.CopyTo(Span);
break;
IEnumerable<T> Enumerate()
{
for (int i = 0; i < Data.Length; i++)
yield return Data[i];
case ArraySegment<T> segment:
segment.AsSpan().CopyTo(Span);
break;
case ListOfPrimitivesDeserializer<T> deser:
deser.Span.CopyTo(Span);
break;
case ListOfPrimitivesSerializer<T> ser:
ser.Span.CopyTo(Span);
break;
default:
for (int i = 0; i < items.Count; i++)
{
this[i] = items[i];
}
break;
}
}
/// <summary>
@ -107,6 +122,6 @@ namespace Capnp
/// <returns></returns>
public IEnumerator<T> GetEnumerator() => Enumerate().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => Data.ToArray().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => Span.ToArray().GetEnumerator();
}
}

View File

@ -9,8 +9,8 @@ namespace Capnp
/// </summary>
public class ListOfStructsDeserializer: ListDeserializer, IReadOnlyList<DeserializerState>
{
internal ListOfStructsDeserializer(ref DeserializerState context):
base(ref context)
internal ListOfStructsDeserializer(in DeserializerState context):
base(context)
{
}

View File

@ -4,10 +4,11 @@ using System.IO;
namespace Capnp.Rpc
{
/// <summary>
/// Models an RPC connection.
/// </summary>
public interface IConnection
public interface IConnection: ISupportsMidlayers
{
/// <summary>
/// Returns the state of this connection.
@ -52,15 +53,6 @@ namespace Capnp.Rpc
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
void AttachTracer(IFrameTracer tracer);
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
void InjectMidlayer(Func<Stream, Stream> createFunc);
/// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case
/// of this method is to refuse an incoming connection in the <code>TcpRpcServer.OnConnectionChanged</code> callback.

View File

@ -0,0 +1,21 @@
using System;
using System.IO;
namespace Capnp.Rpc
{
/// <summary>
/// Common interface for classes supporting the installation of midlayers.
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
/// </summary>
public interface ISupportsMidlayers
{
/// <summary>
/// Installs a midlayer
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
void InjectMidlayer(Func<Stream, Stream> createFunc);
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Capnp.Rpc
{
/// <summary>
/// Provides extension methods for installing midlayers to <see cref="TcpRpcServer"/> and <see cref="TcpRpcClient"/>./>.
/// </summary>
public static class MidlayerExtensions
{
/// <summary>
/// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations,
/// hence may cause a significant performance boost.
/// </summary>
/// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param>
/// <param name="bufferSize">Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size</param>
public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize)
{
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s, bufferSize));
}
/// <summary>
/// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations,
/// hence may cause a significant performance boost. Some default buffer size will be chosen.
/// </summary>
/// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param>
public static void AddBuffering(this ISupportsMidlayers obj)
{
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s));
}
}
}

View File

@ -6,98 +6,41 @@ namespace Capnp.Rpc
{
class PendingAnswer: IDisposable
{
readonly object _reentrancyBlocker = new object();
readonly CancellationTokenSource? _cts;
readonly TaskCompletionSource<int> _whenCanceled;
Task<AnswerOrCounterquestion> _callTask;
Task? _initialTask;
Task? _chainedTask;
bool _disposed;
readonly TaskCompletionSource<AnswerOrCounterquestion> _cancelCompleter;
readonly Task<AnswerOrCounterquestion> _answerTask;
public PendingAnswer(Task<AnswerOrCounterquestion> callTask, CancellationTokenSource? cts)
{
async Task<AnswerOrCounterquestion> CancelableAwaitWhenReady()
{
return await await Task.WhenAny(callTask, _cancelCompleter.Task);
}
if (callTask == null)
throw new ArgumentNullException(nameof(callTask));
_cts = cts;
_callTask = callTask ?? throw new ArgumentNullException(nameof(callTask));
_whenCanceled = new TaskCompletionSource<int>();
_cancelCompleter = new TaskCompletionSource<AnswerOrCounterquestion>();
_answerTask = CancelableAwaitWhenReady();
}
public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None;
public void Cancel()
{
_cts?.Cancel();
_whenCanceled.SetResult(0);
_cancelCompleter.SetCanceled();
}
async Task InitialAwaitWhenReady()
public void Chain(Action<Task<AnswerOrCounterquestion>> func)
{
var which = await Task.WhenAny(_callTask, _whenCanceled.Task);
if (which != _callTask)
{
throw new TaskCanceledException();
}
func(_answerTask);
}
async Task AwaitChainedTask(Task chainedTask, Func<Task<AnswerOrCounterquestion>, Task> func)
public void Chain(PromisedAnswer.READER rd, Action<Task<Proxy>> func)
{
try
{
await chainedTask;
}
catch (System.Exception exception)
{
await func(Task.FromException<AnswerOrCounterquestion>(exception));
throw;
}
await func(_callTask);
}
static async Task AwaitSeq(Task task1, Task task2)
{
await task1;
await task2;
}
public void Chain(bool strictSync, Func<Task<AnswerOrCounterquestion>, Task> func)
{
lock (_reentrancyBlocker)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(PendingAnswer));
}
if (_initialTask == null)
{
_initialTask = InitialAwaitWhenReady();
}
Task followUpTask;
if (strictSync)
{
followUpTask = AwaitChainedTask(_chainedTask ?? _initialTask, func);
}
else
{
followUpTask = AwaitChainedTask(_initialTask, func);
}
if (_chainedTask != null)
{
_chainedTask = AwaitSeq(_chainedTask, followUpTask);
}
else
{
_chainedTask = followUpTask;
}
}
}
public void Chain(bool strictSync, PromisedAnswer.READER rd, Func<Task<Proxy>, Task> func)
{
Chain(strictSync, async t =>
Chain(t =>
{
async Task<Proxy> EvaluateProxy()
{
@ -158,43 +101,13 @@ namespace Capnp.Rpc
}
}
await func(EvaluateProxy());
func(EvaluateProxy());
});
}
public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None;
public async void Dispose()
public void Dispose()
{
if (_cts != null)
{
Task? chainedTask;
lock (_reentrancyBlocker)
{
if (_disposed)
{
return;
}
chainedTask = _chainedTask;
_disposed = true;
}
if (chainedTask != null)
{
try
{
await chainedTask;
}
catch
{
}
finally
{
_cts.Dispose();
}
}
}
_cts?.Dispose();
}
}
}

View File

@ -411,7 +411,7 @@ namespace Capnp.Rpc
switch (req.SendResultsTo.which)
{
case Call.sendResultsTo.WHICH.Caller:
pendingAnswer.Chain(false, async t =>
pendingAnswer.Chain(async t =>
{
try
{
@ -479,7 +479,7 @@ namespace Capnp.Rpc
break;
case Call.sendResultsTo.WHICH.Yourself:
pendingAnswer.Chain(false, async t =>
pendingAnswer.Chain(async t =>
{
try
{
@ -587,7 +587,6 @@ namespace Capnp.Rpc
if (exists)
{
previousAnswer!.Chain(
false,
req.Target.PromisedAnswer,
async t =>
{
@ -690,7 +689,7 @@ namespace Capnp.Rpc
if (exists)
{
pendingAnswer!.Chain(false, async t =>
pendingAnswer!.Chain(async t =>
{
try
{
@ -820,7 +819,7 @@ namespace Capnp.Rpc
if (_answerTable.TryGetValue(promisedAnswer.QuestionId, out var previousAnswer))
{
previousAnswer.Chain(true,
previousAnswer.Chain(
disembargo.Target.PromisedAnswer,
async t =>
{
@ -929,9 +928,9 @@ namespace Capnp.Rpc
void ReleaseResultCaps(PendingAnswer answer)
{
try
answer.Chain(async t =>
{
answer.Chain(false, async t =>
try
{
var aorcq = await t;
var results = aorcq.Answer;
@ -943,11 +942,11 @@ namespace Capnp.Rpc
cap?.Release();
}
}
});
}
catch
{
}
}
catch
{
}
});
}
void ProcessFinish(Finish.READER finish)
@ -1258,7 +1257,7 @@ namespace Capnp.Rpc
{
var tcs = new TaskCompletionSource<Proxy>();
pendingAnswer.Chain(false,
pendingAnswer.Chain(
capDesc.ReceiverAnswer,
async t =>
{

View File

@ -32,7 +32,7 @@ namespace Capnp.Rpc
/// <summary>
/// Cap'n Proto RPC TCP server.
/// </summary>
public class TcpRpcServer: IDisposable
public class TcpRpcServer: ISupportsMidlayers, IDisposable
{
ILogger Logger { get; } = Logging.CreateLogger<TcpRpcServer>();
@ -161,17 +161,17 @@ namespace Capnp.Rpc
}
readonly RpcEngine _rpcEngine;
readonly TcpListener _listener;
readonly object _reentrancyBlocker = new object();
readonly Thread _acceptorThread;
readonly List<Connection> _connections = new List<Connection>();
Thread? _acceptorThread;
TcpListener? _listener;
/// <summary>
/// Gets the number of currently active inbound TCP connections.
/// </summary>
public int ConnectionCount { get; private set; }
void AcceptClients()
void AcceptClients(TcpListener listener)
{
try
{
@ -180,7 +180,7 @@ namespace Capnp.Rpc
while (true)
{
var client = _listener.AcceptTcpClient();
var client = listener.AcceptTcpClient();
var connection = new Connection(this, client);
lock (_reentrancyBlocker)
@ -244,7 +244,10 @@ namespace Capnp.Rpc
/// </summary>
public void Dispose()
{
StopListening();
if (_listener != null)
{
StopListening();
}
var connections = new List<Connection>();
@ -260,8 +263,6 @@ namespace Capnp.Rpc
SafeJoin(connection.PumpRunner);
}
SafeJoin(_acceptorThread);
GC.SuppressFinalize(this);
}
@ -270,6 +271,9 @@ namespace Capnp.Rpc
/// </summary>
public void StopListening()
{
if (_listener == null)
throw new InvalidOperationException("Listening was never started");
try
{
_listener.Stop();
@ -277,47 +281,109 @@ namespace Capnp.Rpc
catch (SocketException)
{
}
finally
{
_listener = null;
SafeJoin(_acceptorThread);
_acceptorThread = null;
}
}
/// <summary>
/// Installs a midlayer.
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
/// </summary>
/// <param name="createFunc"></param>
public void InjectMidlayer(Func<Stream, Stream> createFunc)
{
OnConnectionChanged += (_, e) =>
{
if (e.Connection.State == ConnectionState.Initializing)
{
e.Connection.InjectMidlayer(createFunc);
}
};
}
/// <summary>
/// Constructs an instance.
/// </summary>
public TcpRpcServer()
{
_rpcEngine = new RpcEngine();
}
/// <summary>
/// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients.
/// If you intend configuring a midlayer or consuming the <see cref="OnConnectionChanged"/> event,
/// you should not use this constructor, since it may lead to an early-client race condition.
/// Instead, use the parameterless constructor, configure, then call <see cref="StartAccepting(IPAddress, int)"/>.
/// </summary>
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
/// <param name="port">The port on which to listen for incoming connection attempts.</param>
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
public TcpRpcServer(IPAddress localAddr, int port)
/// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
public TcpRpcServer(IPAddress localAddr, int port): this()
{
_rpcEngine = new RpcEngine();
_listener = new TcpListener(localAddr, port);
_listener.ExclusiveAddressUse = false;
StartAccepting(localAddr, port);
}
for (int retry = 0; retry < 5; retry++)
/// <summary>
/// Starts listening to the specified TCP/IP endpoint and accepting clients.
/// </summary>
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
/// <param name="port">The port on which to listen for incoming connection attempts.</param>
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="InvalidOperationException">Listening activity was already started</exception>
/// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
public void StartAccepting(IPAddress localAddr, int port)
{
if (_listener != null)
throw new InvalidOperationException("Listening activity was already started");
var listener = new TcpListener(localAddr, port)
{
ExclusiveAddressUse = false
};
int attempt = 0;
while (true)
{
try
{
_listener.Start();
listener.Start();
break;
}
catch (SocketException socketException)
{
Logger.LogWarning($"Failed to listen on port {port}, attempt {retry}: {socketException}");
Thread.Sleep(10);
if (attempt == 5)
throw;
Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}");
}
++attempt;
Thread.Sleep(10);
}
_acceptorThread = new Thread(AcceptClients);
_acceptorThread = new Thread(() => AcceptClients(listener));
_listener = listener;
_acceptorThread.Start();
}
/// <summary>
/// Whether the thread which is responsible for acception incoming attempts is still alive.
/// The thread will die upon disposal, but also in case of a socket error condition.
/// The thread will die after calling <see cref="StopListening"/>, upon disposal, but also in case of a socket error condition.
/// Errors which occur on a particular connection will just close that connection and won't interfere
/// with the acceptor thread.
/// </summary>
public bool IsAlive => _acceptorThread.IsAlive;
public bool IsAlive => _acceptorThread?.IsAlive ?? false;
/// <summary>
/// Sets the bootstrap capability. It must be an object which implements a valid capability interface

View File

@ -49,7 +49,7 @@ namespace Capnp
/// <param name="defaultSegmentSize">Default size (in words) of a newly allocated segment. If a single allocation requires
/// a bigger size, a bigger dedicated segment will be allocated. On the wire, segments will be truncated to their actual
/// occupancies.</param>
public SegmentAllocator(int defaultSegmentSize = 128)
public SegmentAllocator(int defaultSegmentSize = 64)
{
_defaultSegmentSize = defaultSegmentSize;
}

View File

@ -0,0 +1,96 @@
using System;
using System.IO;
namespace Capnp.Util
{
internal class DuplexBufferedStream : Stream
{
// A buffer size of 1024 bytes seems to be a good comprise, giving good performance
// in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes.
const int DefaultBufferSize = 1024;
readonly BufferedStream _readStream;
readonly BufferedStream _writeStream;
readonly int _bufferSize;
readonly object _reentrancyBlocker = new object();
public DuplexBufferedStream(Stream stream, int bufferSize)
{
_readStream = new BufferedStream(stream, bufferSize);
_writeStream = new BufferedStream(stream, bufferSize);
_bufferSize = bufferSize;
}
public DuplexBufferedStream(Stream stream): this(stream, DefaultBufferSize)
{
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => 0;
public override long Position
{
get => 0;
set => throw new NotSupportedException();
}
public override void Flush()
{
_writeStream.Flush();
}
public override int Read(byte[] buffer, int offset, int count)
{
return _readStream.Read(buffer, offset, count);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
if (buffer.Length > _bufferSize) // avoid moiré-like timing effects
_writeStream.Flush();
_writeStream.Write(buffer, offset, count);
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
lock (_reentrancyBlocker)
{
try
{
_readStream.Dispose();
}
catch
{
}
try
{
_writeStream.Dispose();
}
catch
{
}
}
}
base.Dispose(disposing);
}
}
}