mirror of
https://github.com/FabInfra/capnproto-dotnetcore_Runtime.git
synced 2025-03-12 14:51:41 +01:00
commit
21b5ee075d
@ -7,7 +7,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="BenchmarkDotNet" Version="0.12.0" />
|
||||
<PackageReference Include="Capnp.Net.Runtime" Version="1.2.189" />
|
||||
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.34-g409e517587" />
|
||||
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
|
||||
<PackageReference Include="Google.Protobuf" Version="3.11.3" />
|
||||
<PackageReference Include="Grpc.Net.Client" Version="2.27.0" />
|
||||
|
@ -13,6 +13,9 @@ namespace Benchmark
|
||||
[Params(20, 200, 2000, 20000, 200000, 2000000)]
|
||||
public int PayloadBytes;
|
||||
|
||||
[Params(0, 256, 1024, 4096)]
|
||||
public int BufferSize;
|
||||
|
||||
TcpRpcClient _client;
|
||||
IEchoer _echoer;
|
||||
byte[] _payload;
|
||||
@ -21,6 +24,8 @@ namespace Benchmark
|
||||
public void Setup()
|
||||
{
|
||||
_client = new TcpRpcClient("localhost", 5002);
|
||||
if (BufferSize > 0)
|
||||
_client.AddBuffering(BufferSize);
|
||||
_client.WhenConnected.Wait();
|
||||
_echoer = _client.GetMain<IEchoer>();
|
||||
_payload = new byte[PayloadBytes];
|
||||
|
25
Benchmarking/CapnpProfile.sln
Normal file
25
Benchmarking/CapnpProfile.sln
Normal file
@ -0,0 +1,25 @@
|
||||
|
||||
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio Version 16
|
||||
VisualStudioVersion = 16.0.29728.190
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CapnpProfile", "CapnpProfile\CapnpProfile.csproj", "{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
Release|Any CPU = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{D2D3AE23-C19E-47C7-B758-E2259DC01B5A}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {F5DDDA09-394B-4A71-B6BE-C7103BCEF3A2}
|
||||
EndGlobalSection
|
||||
EndGlobal
|
19
Benchmarking/CapnpProfile/CapnpProfile.csproj
Normal file
19
Benchmarking/CapnpProfile/CapnpProfile.csproj
Normal file
@ -0,0 +1,19 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp3.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
|
||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
|
||||
<DebugType>full</DebugType>
|
||||
<DebugSymbols>true</DebugSymbols>
|
||||
<PlatformTarget>x64</PlatformTarget>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.34-g409e517587" />
|
||||
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.3.29-g6d711b8579" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
32
Benchmarking/CapnpProfile/Program.cs
Normal file
32
Benchmarking/CapnpProfile/Program.cs
Normal file
@ -0,0 +1,32 @@
|
||||
using Capnp.Rpc;
|
||||
using CapnpGen;
|
||||
using CapnpProfile.Services;
|
||||
using System;
|
||||
using System.Net;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CapnpProfile
|
||||
{
|
||||
class Program
|
||||
{
|
||||
static async Task Main(string[] args)
|
||||
{
|
||||
using var server = new TcpRpcServer();
|
||||
server.Main = new CapnpEchoService();
|
||||
server.AddBuffering();
|
||||
server.StartAccepting(IPAddress.Any, 5002);
|
||||
using var client = new TcpRpcClient("localhost", 5002);
|
||||
await client.WhenConnected;
|
||||
using var echoer = client.GetMain<IEchoer>();
|
||||
var payload = new byte[20];
|
||||
new Random().NextBytes(payload);
|
||||
|
||||
while (true)
|
||||
{
|
||||
var result = await echoer.Echo(payload);
|
||||
if (result.Count != payload.Length)
|
||||
throw new InvalidOperationException("Echo server malfunction");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
5
Benchmarking/CapnpProfile/Protos/Echo.capnp
Normal file
5
Benchmarking/CapnpProfile/Protos/Echo.capnp
Normal file
@ -0,0 +1,5 @@
|
||||
@0x8c309c720de8cf7c;
|
||||
|
||||
interface Echoer {
|
||||
echo @0 (input : Data) -> (output : Data);
|
||||
}
|
20
Benchmarking/CapnpProfile/Services/CapnpEchoService.cs
Normal file
20
Benchmarking/CapnpProfile/Services/CapnpEchoService.cs
Normal file
@ -0,0 +1,20 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CapnpProfile.Services
|
||||
{
|
||||
public class CapnpEchoService : CapnpGen.IEchoer
|
||||
{
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<byte>> Echo(IReadOnlyList<byte> input, CancellationToken cancellationToken_ = default)
|
||||
{
|
||||
return Task.FromResult(input);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
@ -6,7 +6,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Capnp.Net.Runtime" Version="1.2.189" />
|
||||
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.34-g409e517587" />
|
||||
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
|
||||
</ItemGroup>
|
||||
|
||||
|
@ -9,9 +9,11 @@ namespace EchoServiceCapnp
|
||||
{
|
||||
static void Main(string[] args)
|
||||
{
|
||||
using (var server = new TcpRpcServer(IPAddress.Any, 5002))
|
||||
using (var server = new TcpRpcServer())
|
||||
{
|
||||
server.AddBuffering();
|
||||
server.Main = new CapnpEchoService();
|
||||
server.StartAccepting(IPAddress.Any, 5002);
|
||||
Console.WriteLine("Press RETURN to stop listening");
|
||||
Console.ReadLine();
|
||||
}
|
||||
|
10
Benchmarking/nuget.config
Normal file
10
Benchmarking/nuget.config
Normal file
@ -0,0 +1,10 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<configuration>
|
||||
<config>
|
||||
<add key="repositoryPath" value="..\packages" />
|
||||
<add key="globalPackagesFolder" value="..\globalPackages" />
|
||||
</config>
|
||||
<packageSources>
|
||||
<add key="CapnpRuntimeSource" value="..\GeneratedNuGetPackages\Release" />
|
||||
</packageSources>
|
||||
</configuration>
|
@ -36,7 +36,7 @@ namespace Capnp.Net.Runtime.Tests
|
||||
|
||||
Process _currentProcess;
|
||||
|
||||
void LaunchCompatTestProcess(string whichTest, Action<StreamReader> test)
|
||||
bool TryLaunchCompatTestProcess(string whichTest, Action<StreamReader> test)
|
||||
{
|
||||
string myPath = Path.GetDirectoryName(typeof(TcpRpcInterop).Assembly.Location);
|
||||
string config;
|
||||
@ -71,6 +71,12 @@ namespace Capnp.Net.Runtime.Tests
|
||||
"Problem after launching test process");
|
||||
|
||||
test(_currentProcess.StandardOutput);
|
||||
|
||||
return true;
|
||||
}
|
||||
catch (AssertFailedException)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
@ -85,6 +91,20 @@ namespace Capnp.Net.Runtime.Tests
|
||||
}
|
||||
}
|
||||
|
||||
void LaunchCompatTestProcess(string whichTest, Action<StreamReader> test)
|
||||
{
|
||||
for (int retry = 0; retry < 5; retry++)
|
||||
{
|
||||
if (TryLaunchCompatTestProcess(whichTest, test))
|
||||
return;
|
||||
|
||||
if (whichTest.StartsWith("server:"))
|
||||
PrepareNextTest();
|
||||
}
|
||||
|
||||
Assert.Fail("Problem after launching test process");
|
||||
}
|
||||
|
||||
void SendInput(string line)
|
||||
{
|
||||
_currentProcess.StandardInput.WriteLine(line);
|
||||
|
@ -98,14 +98,7 @@ namespace Capnp.Net.Runtime.Tests
|
||||
using (var server = new TcpRpcServer(IPAddress.Any, TcpPort))
|
||||
using (var client = new TcpRpcClient())
|
||||
{
|
||||
server.OnConnectionChanged += (_, e) =>
|
||||
{
|
||||
if (e.Connection.State == ConnectionState.Initializing)
|
||||
{
|
||||
e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7));
|
||||
}
|
||||
};
|
||||
|
||||
server.InjectMidlayer(s => new ScatteringStream(s, 7));
|
||||
client.InjectMidlayer(s => new ScatteringStream(s, 10));
|
||||
client.Connect("localhost", TcpPort);
|
||||
client.WhenConnected.Wait();
|
||||
|
@ -5,6 +5,7 @@ using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
@ -20,8 +21,39 @@ namespace Capnp.Net.Runtime.Tests
|
||||
|
||||
protected ILogger Logger { get; set; }
|
||||
|
||||
protected TcpRpcClient SetupClient() => new TcpRpcClient("localhost", TcpPort);
|
||||
protected TcpRpcServer SetupServer() => new TcpRpcServer(IPAddress.Any, TcpPort);
|
||||
protected TcpRpcClient SetupClient()
|
||||
{
|
||||
var client = new TcpRpcClient();
|
||||
client.AddBuffering();
|
||||
client.Connect("localhost", TcpPort);
|
||||
return client;
|
||||
}
|
||||
|
||||
protected TcpRpcServer SetupServer()
|
||||
{
|
||||
int attempt = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
var server = new TcpRpcServer(IPAddress.Any, TcpPort);
|
||||
server.AddBuffering();
|
||||
return server;
|
||||
}
|
||||
catch (SocketException)
|
||||
{
|
||||
// If the TCP listening port is occupied by some other process,
|
||||
// retry with a different one
|
||||
|
||||
if (attempt == 5)
|
||||
throw;
|
||||
}
|
||||
|
||||
IncrementTcpPort();
|
||||
++attempt;
|
||||
}
|
||||
}
|
||||
|
||||
protected (TcpRpcServer, TcpRpcClient) SetupClientServerPair()
|
||||
{
|
||||
|
@ -32,6 +32,11 @@
|
||||
<DefineConstants>DebugCapabilityLifecycle</DefineConstants>
|
||||
</PropertyGroup>
|
||||
|
||||
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netstandard2.0|AnyCPU'">
|
||||
<DebugType>portable</DebugType>
|
||||
<DebugSymbols>true</DebugSymbols>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
|
||||
</ItemGroup>
|
||||
|
@ -516,28 +516,28 @@ namespace Capnp
|
||||
switch (Kind)
|
||||
{
|
||||
case ObjectKind.ListOfBits:
|
||||
return new ListOfBitsDeserializer(ref this, false);
|
||||
return new ListOfBitsDeserializer(this, false);
|
||||
|
||||
case ObjectKind.ListOfBytes:
|
||||
return new ListOfPrimitivesDeserializer<byte>(ref this, ListKind.ListOfBytes);
|
||||
return new ListOfPrimitivesDeserializer<byte>(this, ListKind.ListOfBytes);
|
||||
|
||||
case ObjectKind.ListOfEmpty:
|
||||
return new ListOfEmptyDeserializer(ref this);
|
||||
return new ListOfEmptyDeserializer(this);
|
||||
|
||||
case ObjectKind.ListOfInts:
|
||||
return new ListOfPrimitivesDeserializer<int>(ref this, ListKind.ListOfInts);
|
||||
return new ListOfPrimitivesDeserializer<int>(this, ListKind.ListOfInts);
|
||||
|
||||
case ObjectKind.ListOfLongs:
|
||||
return new ListOfPrimitivesDeserializer<long>(ref this, ListKind.ListOfLongs);
|
||||
return new ListOfPrimitivesDeserializer<long>(this, ListKind.ListOfLongs);
|
||||
|
||||
case ObjectKind.ListOfPointers:
|
||||
return new ListOfPointersDeserializer(ref this);
|
||||
return new ListOfPointersDeserializer(this);
|
||||
|
||||
case ObjectKind.ListOfShorts:
|
||||
return new ListOfPrimitivesDeserializer<short>(ref this, ListKind.ListOfShorts);
|
||||
return new ListOfPrimitivesDeserializer<short>(this, ListKind.ListOfShorts);
|
||||
|
||||
case ObjectKind.ListOfStructs:
|
||||
return new ListOfStructsDeserializer(ref this);
|
||||
return new ListOfStructsDeserializer(this);
|
||||
|
||||
case ObjectKind.Nil:
|
||||
return new EmptyListDeserializer();
|
||||
@ -557,7 +557,7 @@ namespace Capnp
|
||||
switch (Kind)
|
||||
{
|
||||
case ObjectKind.ListOfPointers:
|
||||
return new ListOfCapsDeserializer<T>(ref this);
|
||||
return new ListOfCapsDeserializer<T>(this);
|
||||
|
||||
default:
|
||||
throw new DeserializationException("Cannot deserialize this object as capability list");
|
||||
|
@ -116,6 +116,8 @@ namespace Capnp
|
||||
#endif
|
||||
_writer.Write(bytes);
|
||||
}
|
||||
|
||||
_writer.Flush();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ namespace Capnp
|
||||
/// </summary>
|
||||
protected readonly DeserializerState State;
|
||||
|
||||
internal ListDeserializer(ref DeserializerState state)
|
||||
internal ListDeserializer(in DeserializerState state)
|
||||
{
|
||||
State = state;
|
||||
}
|
||||
|
@ -11,8 +11,8 @@ namespace Capnp
|
||||
{
|
||||
readonly bool _defaultValue;
|
||||
|
||||
internal ListOfBitsDeserializer(ref DeserializerState context, bool defaultValue) :
|
||||
base(ref context)
|
||||
internal ListOfBitsDeserializer(in DeserializerState context, bool defaultValue) :
|
||||
base(context)
|
||||
{
|
||||
_defaultValue = defaultValue;
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ namespace Capnp
|
||||
public class ListOfCapsDeserializer<T> : ListDeserializer, IReadOnlyList<T>
|
||||
where T: class
|
||||
{
|
||||
internal ListOfCapsDeserializer(ref DeserializerState state) : base(ref state)
|
||||
internal ListOfCapsDeserializer(in DeserializerState state) : base(state)
|
||||
{
|
||||
Rpc.CapabilityReflection.ValidateCapabilityInterface(typeof(T));
|
||||
}
|
||||
|
@ -10,8 +10,8 @@ namespace Capnp
|
||||
/// </summary>
|
||||
public class ListOfEmptyDeserializer : ListDeserializer, IReadOnlyList<DeserializerState>
|
||||
{
|
||||
internal ListOfEmptyDeserializer(ref DeserializerState state) :
|
||||
base(ref state)
|
||||
internal ListOfEmptyDeserializer(in DeserializerState state) :
|
||||
base(state)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -9,8 +9,8 @@ namespace Capnp
|
||||
/// </summary>
|
||||
public class ListOfPointersDeserializer: ListDeserializer, IReadOnlyList<DeserializerState>
|
||||
{
|
||||
internal ListOfPointersDeserializer(ref DeserializerState state) :
|
||||
base(ref state)
|
||||
internal ListOfPointersDeserializer(in DeserializerState state) :
|
||||
base(state)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -7,11 +7,11 @@ using System.Text;
|
||||
namespace Capnp
|
||||
{
|
||||
/// <summary>
|
||||
/// ListDeserializer specialization for List(Int*), List(UInt*), List(Float*), and List(Enum).
|
||||
/// ListDeserializer specialization for unmanaged primitive types (including enum).
|
||||
/// </summary>
|
||||
/// <typeparam name="T">List element type</typeparam>
|
||||
public class ListOfPrimitivesDeserializer<T>: ListDeserializer, IReadOnlyList<T>
|
||||
where T: struct
|
||||
where T: unmanaged
|
||||
{
|
||||
class ListOfULongAsStructView<U> : IReadOnlyList<U>
|
||||
{
|
||||
@ -73,12 +73,10 @@ namespace Capnp
|
||||
|
||||
readonly ListKind _kind;
|
||||
|
||||
internal ListOfPrimitivesDeserializer(ref DeserializerState state, ListKind kind) :
|
||||
base(ref state)
|
||||
internal ListOfPrimitivesDeserializer(in DeserializerState state, ListKind kind) :
|
||||
base(state)
|
||||
{
|
||||
_kind = kind;
|
||||
|
||||
var binCoder = PrimitiveCoder.Get<T>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -86,7 +84,10 @@ namespace Capnp
|
||||
/// </summary>
|
||||
public override ListKind Kind => _kind;
|
||||
|
||||
ReadOnlySpan<T> Data => MemoryMarshal.Cast<ulong, T>(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count);
|
||||
/// <summary>
|
||||
/// Retrieves the underlying memory span of this object
|
||||
/// </summary>
|
||||
public ReadOnlySpan<T> Span => MemoryMarshal.Cast<ulong, T>(State.CurrentSegment.Slice(State.Offset)).Slice(0, Count);
|
||||
|
||||
/// <summary>
|
||||
/// Returns the element at given index.
|
||||
@ -94,15 +95,14 @@ namespace Capnp
|
||||
/// <param name="index">Element index</param>
|
||||
/// <returns>Element value</returns>
|
||||
/// <exception cref="IndexOutOfRangeException"><paramref name="index"/> is out of range.</exception>
|
||||
public T this[int index] => Data[index];
|
||||
public T this[int index] => Span[index];
|
||||
|
||||
ListOfPrimitivesDeserializer<U> PrimitiveCast<U>() where U: struct
|
||||
ListOfPrimitivesDeserializer<U> PrimitiveCast<U>() where U: unmanaged
|
||||
{
|
||||
if (Marshal.SizeOf<U>() != Marshal.SizeOf<T>())
|
||||
throw new NotSupportedException("Source and target types have different sizes, cannot cast");
|
||||
|
||||
var stateCopy = State;
|
||||
return new ListOfPrimitivesDeserializer<U>(ref stateCopy, Kind);
|
||||
return new ListOfPrimitivesDeserializer<U>(State, Kind);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -205,7 +205,7 @@ namespace Capnp
|
||||
/// <exception cref="NotSupportedException">Element size is different from 1 byte.</exception>
|
||||
public override string CastText()
|
||||
{
|
||||
var utf8Bytes = PrimitiveCast<byte>().Data;
|
||||
var utf8Bytes = PrimitiveCast<byte>().Span;
|
||||
if (utf8Bytes.Length == 0) return string.Empty;
|
||||
var utf8GytesNoZterm = utf8Bytes.Slice(0, utf8Bytes.Length - 1);
|
||||
return Encoding.UTF8.GetString(utf8GytesNoZterm.ToArray());
|
||||
|
@ -6,13 +6,13 @@ using System.Runtime.InteropServices;
|
||||
namespace Capnp
|
||||
{
|
||||
/// <summary>
|
||||
/// SerializerState specialization for List(Int*), List(UInt*), List(Float*), and List(Enum).
|
||||
/// SerializerState specialization for unmanaged primitive types (including enum).
|
||||
/// </summary>
|
||||
/// <typeparam name="T">List element type, must be primitive. Static constructor will throw if the type does not work.</typeparam>
|
||||
public class ListOfPrimitivesSerializer<T> :
|
||||
SerializerState,
|
||||
IReadOnlyList<T>
|
||||
where T : struct
|
||||
where T : unmanaged
|
||||
{
|
||||
static readonly int ElementSize;
|
||||
|
||||
@ -28,7 +28,10 @@ namespace Capnp
|
||||
}
|
||||
}
|
||||
|
||||
Span<T> Data => MemoryMarshal.Cast<ulong, T>(RawData);
|
||||
/// <summary>
|
||||
/// Retrieves the underlying memory span of the represented items.
|
||||
/// </summary>
|
||||
public Span<T> Span => MemoryMarshal.Cast<ulong, T>(RawData);
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the value at given index.
|
||||
@ -37,8 +40,8 @@ namespace Capnp
|
||||
/// <returns>Element value</returns>
|
||||
public T this[int index]
|
||||
{
|
||||
get => Data[index];
|
||||
set => Data[index] = value;
|
||||
get => Span[index];
|
||||
set => Span[index] = value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -77,19 +80,39 @@ namespace Capnp
|
||||
}
|
||||
|
||||
Init(items.Count);
|
||||
|
||||
for (int i = 0; i < items.Count; i++)
|
||||
|
||||
switch (items)
|
||||
{
|
||||
this[i] = items[i];
|
||||
case T[] array:
|
||||
array.CopyTo(Span);
|
||||
break;
|
||||
|
||||
case ArraySegment<T> segment:
|
||||
segment.AsSpan().CopyTo(Span);
|
||||
break;
|
||||
|
||||
case ListOfPrimitivesDeserializer<T> deser:
|
||||
deser.Span.CopyTo(Span);
|
||||
break;
|
||||
|
||||
case ListOfPrimitivesSerializer<T> ser:
|
||||
ser.Span.CopyTo(Span);
|
||||
break;
|
||||
|
||||
default:
|
||||
for (int i = 0; i < items.Count; i++)
|
||||
{
|
||||
this[i] = items[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Implements <see cref="IEnumerable{T}"/>.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public IEnumerator<T> GetEnumerator() => (IEnumerator<T>)Data.ToArray().GetEnumerator();
|
||||
public IEnumerator<T> GetEnumerator() => (IEnumerator<T>)Span.ToArray().GetEnumerator();
|
||||
|
||||
IEnumerator IEnumerable.GetEnumerator() => Data.ToArray().GetEnumerator();
|
||||
IEnumerator IEnumerable.GetEnumerator() => Span.ToArray().GetEnumerator();
|
||||
}
|
||||
}
|
@ -9,8 +9,8 @@ namespace Capnp
|
||||
/// </summary>
|
||||
public class ListOfStructsDeserializer: ListDeserializer, IReadOnlyList<DeserializerState>
|
||||
{
|
||||
internal ListOfStructsDeserializer(ref DeserializerState context):
|
||||
base(ref context)
|
||||
internal ListOfStructsDeserializer(in DeserializerState context):
|
||||
base(context)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -4,10 +4,11 @@ using System.IO;
|
||||
|
||||
namespace Capnp.Rpc
|
||||
{
|
||||
|
||||
/// <summary>
|
||||
/// Models an RPC connection.
|
||||
/// </summary>
|
||||
public interface IConnection
|
||||
public interface IConnection: ISupportsMidlayers
|
||||
{
|
||||
/// <summary>
|
||||
/// Returns the state of this connection.
|
||||
@ -52,15 +53,6 @@ namespace Capnp.Rpc
|
||||
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
|
||||
void AttachTracer(IFrameTracer tracer);
|
||||
|
||||
/// <summary>
|
||||
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
|
||||
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
|
||||
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
|
||||
/// </summary>
|
||||
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
|
||||
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
|
||||
void InjectMidlayer(Func<Stream, Stream> createFunc);
|
||||
|
||||
/// <summary>
|
||||
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case
|
||||
/// of this method is to refuse an incoming connection in the <code>TcpRpcServer.OnConnectionChanged</code> callback.
|
||||
|
21
Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs
Normal file
21
Capnp.Net.Runtime/Rpc/ISupportsMidlayers.cs
Normal file
@ -0,0 +1,21 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
|
||||
namespace Capnp.Rpc
|
||||
{
|
||||
/// <summary>
|
||||
/// Common interface for classes supporting the installation of midlayers.
|
||||
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
|
||||
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
|
||||
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
|
||||
/// </summary>
|
||||
public interface ISupportsMidlayers
|
||||
{
|
||||
/// <summary>
|
||||
/// Installs a midlayer
|
||||
/// </summary>
|
||||
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
|
||||
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
|
||||
void InjectMidlayer(Func<Stream, Stream> createFunc);
|
||||
}
|
||||
}
|
33
Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs
Normal file
33
Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs
Normal file
@ -0,0 +1,33 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace Capnp.Rpc
|
||||
{
|
||||
/// <summary>
|
||||
/// Provides extension methods for installing midlayers to <see cref="TcpRpcServer"/> and <see cref="TcpRpcClient"/>./>.
|
||||
/// </summary>
|
||||
public static class MidlayerExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations,
|
||||
/// hence may cause a significant performance boost.
|
||||
/// </summary>
|
||||
/// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param>
|
||||
/// <param name="bufferSize">Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size</param>
|
||||
public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize)
|
||||
{
|
||||
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s, bufferSize));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations,
|
||||
/// hence may cause a significant performance boost. Some default buffer size will be chosen.
|
||||
/// </summary>
|
||||
/// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param>
|
||||
public static void AddBuffering(this ISupportsMidlayers obj)
|
||||
{
|
||||
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s));
|
||||
}
|
||||
}
|
||||
}
|
@ -6,98 +6,41 @@ namespace Capnp.Rpc
|
||||
{
|
||||
class PendingAnswer: IDisposable
|
||||
{
|
||||
readonly object _reentrancyBlocker = new object();
|
||||
readonly CancellationTokenSource? _cts;
|
||||
readonly TaskCompletionSource<int> _whenCanceled;
|
||||
Task<AnswerOrCounterquestion> _callTask;
|
||||
Task? _initialTask;
|
||||
Task? _chainedTask;
|
||||
bool _disposed;
|
||||
readonly TaskCompletionSource<AnswerOrCounterquestion> _cancelCompleter;
|
||||
readonly Task<AnswerOrCounterquestion> _answerTask;
|
||||
|
||||
public PendingAnswer(Task<AnswerOrCounterquestion> callTask, CancellationTokenSource? cts)
|
||||
{
|
||||
async Task<AnswerOrCounterquestion> CancelableAwaitWhenReady()
|
||||
{
|
||||
return await await Task.WhenAny(callTask, _cancelCompleter.Task);
|
||||
}
|
||||
|
||||
if (callTask == null)
|
||||
throw new ArgumentNullException(nameof(callTask));
|
||||
|
||||
_cts = cts;
|
||||
_callTask = callTask ?? throw new ArgumentNullException(nameof(callTask));
|
||||
_whenCanceled = new TaskCompletionSource<int>();
|
||||
_cancelCompleter = new TaskCompletionSource<AnswerOrCounterquestion>();
|
||||
_answerTask = CancelableAwaitWhenReady();
|
||||
}
|
||||
|
||||
public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None;
|
||||
|
||||
public void Cancel()
|
||||
{
|
||||
_cts?.Cancel();
|
||||
_whenCanceled.SetResult(0);
|
||||
_cancelCompleter.SetCanceled();
|
||||
}
|
||||
|
||||
async Task InitialAwaitWhenReady()
|
||||
public void Chain(Action<Task<AnswerOrCounterquestion>> func)
|
||||
{
|
||||
var which = await Task.WhenAny(_callTask, _whenCanceled.Task);
|
||||
|
||||
if (which != _callTask)
|
||||
{
|
||||
throw new TaskCanceledException();
|
||||
}
|
||||
func(_answerTask);
|
||||
}
|
||||
|
||||
async Task AwaitChainedTask(Task chainedTask, Func<Task<AnswerOrCounterquestion>, Task> func)
|
||||
public void Chain(PromisedAnswer.READER rd, Action<Task<Proxy>> func)
|
||||
{
|
||||
try
|
||||
{
|
||||
await chainedTask;
|
||||
}
|
||||
catch (System.Exception exception)
|
||||
{
|
||||
await func(Task.FromException<AnswerOrCounterquestion>(exception));
|
||||
throw;
|
||||
}
|
||||
|
||||
await func(_callTask);
|
||||
}
|
||||
|
||||
static async Task AwaitSeq(Task task1, Task task2)
|
||||
{
|
||||
await task1;
|
||||
await task2;
|
||||
}
|
||||
|
||||
public void Chain(bool strictSync, Func<Task<AnswerOrCounterquestion>, Task> func)
|
||||
{
|
||||
|
||||
lock (_reentrancyBlocker)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
throw new ObjectDisposedException(nameof(PendingAnswer));
|
||||
}
|
||||
|
||||
if (_initialTask == null)
|
||||
{
|
||||
_initialTask = InitialAwaitWhenReady();
|
||||
}
|
||||
|
||||
Task followUpTask;
|
||||
|
||||
if (strictSync)
|
||||
{
|
||||
followUpTask = AwaitChainedTask(_chainedTask ?? _initialTask, func);
|
||||
}
|
||||
else
|
||||
{
|
||||
followUpTask = AwaitChainedTask(_initialTask, func);
|
||||
}
|
||||
|
||||
if (_chainedTask != null)
|
||||
{
|
||||
_chainedTask = AwaitSeq(_chainedTask, followUpTask);
|
||||
}
|
||||
else
|
||||
{
|
||||
_chainedTask = followUpTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Chain(bool strictSync, PromisedAnswer.READER rd, Func<Task<Proxy>, Task> func)
|
||||
{
|
||||
Chain(strictSync, async t =>
|
||||
Chain(t =>
|
||||
{
|
||||
async Task<Proxy> EvaluateProxy()
|
||||
{
|
||||
@ -158,43 +101,13 @@ namespace Capnp.Rpc
|
||||
}
|
||||
}
|
||||
|
||||
await func(EvaluateProxy());
|
||||
func(EvaluateProxy());
|
||||
});
|
||||
}
|
||||
|
||||
public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None;
|
||||
|
||||
public async void Dispose()
|
||||
public void Dispose()
|
||||
{
|
||||
if (_cts != null)
|
||||
{
|
||||
Task? chainedTask;
|
||||
|
||||
lock (_reentrancyBlocker)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
chainedTask = _chainedTask;
|
||||
_disposed = true;
|
||||
}
|
||||
|
||||
if (chainedTask != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
await chainedTask;
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
finally
|
||||
{
|
||||
_cts.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
_cts?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
@ -398,7 +398,7 @@ namespace Capnp.Rpc
|
||||
switch (req.SendResultsTo.which)
|
||||
{
|
||||
case Call.sendResultsTo.WHICH.Caller:
|
||||
pendingAnswer.Chain(false, async t =>
|
||||
pendingAnswer.Chain(async t =>
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -466,7 +466,7 @@ namespace Capnp.Rpc
|
||||
break;
|
||||
|
||||
case Call.sendResultsTo.WHICH.Yourself:
|
||||
pendingAnswer.Chain(false, async t =>
|
||||
pendingAnswer.Chain(async t =>
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -575,7 +575,6 @@ namespace Capnp.Rpc
|
||||
if (exists)
|
||||
{
|
||||
previousAnswer!.Chain(
|
||||
false,
|
||||
req.Target.PromisedAnswer,
|
||||
async t =>
|
||||
{
|
||||
@ -679,7 +678,7 @@ namespace Capnp.Rpc
|
||||
|
||||
if (exists)
|
||||
{
|
||||
pendingAnswer!.Chain(false, async t =>
|
||||
pendingAnswer!.Chain(async t =>
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -807,7 +806,7 @@ namespace Capnp.Rpc
|
||||
|
||||
if (_answerTable.TryGetValue(promisedAnswer.QuestionId, out var previousAnswer))
|
||||
{
|
||||
previousAnswer.Chain(true,
|
||||
previousAnswer.Chain(
|
||||
disembargo.Target.PromisedAnswer,
|
||||
async t =>
|
||||
{
|
||||
@ -922,9 +921,9 @@ namespace Capnp.Rpc
|
||||
|
||||
void ReleaseResultCaps(PendingAnswer answer)
|
||||
{
|
||||
try
|
||||
answer.Chain(async t =>
|
||||
{
|
||||
answer.Chain(false, async t =>
|
||||
try
|
||||
{
|
||||
var aorcq = await t;
|
||||
var results = aorcq.Answer;
|
||||
@ -936,11 +935,11 @@ namespace Capnp.Rpc
|
||||
cap?.Release();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void ProcessFinish(Finish.READER finish)
|
||||
@ -1246,7 +1245,7 @@ namespace Capnp.Rpc
|
||||
{
|
||||
var tcs = new TaskCompletionSource<Proxy>();
|
||||
|
||||
pendingAnswer.Chain(false,
|
||||
pendingAnswer.Chain(
|
||||
capDesc.ReceiverAnswer,
|
||||
async t =>
|
||||
{
|
||||
|
@ -32,7 +32,7 @@ namespace Capnp.Rpc
|
||||
/// <summary>
|
||||
/// Cap'n Proto RPC TCP server.
|
||||
/// </summary>
|
||||
public class TcpRpcServer: IDisposable
|
||||
public class TcpRpcServer: ISupportsMidlayers, IDisposable
|
||||
{
|
||||
ILogger Logger { get; } = Logging.CreateLogger<TcpRpcServer>();
|
||||
|
||||
@ -161,17 +161,17 @@ namespace Capnp.Rpc
|
||||
}
|
||||
|
||||
readonly RpcEngine _rpcEngine;
|
||||
readonly TcpListener _listener;
|
||||
readonly object _reentrancyBlocker = new object();
|
||||
readonly Thread _acceptorThread;
|
||||
readonly List<Connection> _connections = new List<Connection>();
|
||||
Thread? _acceptorThread;
|
||||
TcpListener? _listener;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the number of currently active inbound TCP connections.
|
||||
/// </summary>
|
||||
public int ConnectionCount { get; private set; }
|
||||
|
||||
void AcceptClients()
|
||||
void AcceptClients(TcpListener listener)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -180,7 +180,7 @@ namespace Capnp.Rpc
|
||||
|
||||
while (true)
|
||||
{
|
||||
var client = _listener.AcceptTcpClient();
|
||||
var client = listener.AcceptTcpClient();
|
||||
var connection = new Connection(this, client);
|
||||
|
||||
lock (_reentrancyBlocker)
|
||||
@ -244,7 +244,10 @@ namespace Capnp.Rpc
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
StopListening();
|
||||
if (_listener != null)
|
||||
{
|
||||
StopListening();
|
||||
}
|
||||
|
||||
var connections = new List<Connection>();
|
||||
|
||||
@ -260,8 +263,6 @@ namespace Capnp.Rpc
|
||||
SafeJoin(connection.PumpRunner);
|
||||
}
|
||||
|
||||
SafeJoin(_acceptorThread);
|
||||
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
@ -270,6 +271,9 @@ namespace Capnp.Rpc
|
||||
/// </summary>
|
||||
public void StopListening()
|
||||
{
|
||||
if (_listener == null)
|
||||
throw new InvalidOperationException("Listening was never started");
|
||||
|
||||
try
|
||||
{
|
||||
_listener.Stop();
|
||||
@ -277,47 +281,109 @@ namespace Capnp.Rpc
|
||||
catch (SocketException)
|
||||
{
|
||||
}
|
||||
finally
|
||||
{
|
||||
_listener = null;
|
||||
SafeJoin(_acceptorThread);
|
||||
_acceptorThread = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Installs a midlayer.
|
||||
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
|
||||
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
|
||||
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
|
||||
/// </summary>
|
||||
/// <param name="createFunc"></param>
|
||||
public void InjectMidlayer(Func<Stream, Stream> createFunc)
|
||||
{
|
||||
OnConnectionChanged += (_, e) =>
|
||||
{
|
||||
if (e.Connection.State == ConnectionState.Initializing)
|
||||
{
|
||||
e.Connection.InjectMidlayer(createFunc);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Constructs an instance.
|
||||
/// </summary>
|
||||
public TcpRpcServer()
|
||||
{
|
||||
_rpcEngine = new RpcEngine();
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients.
|
||||
/// If you intend configuring a midlayer or consuming the <see cref="OnConnectionChanged"/> event,
|
||||
/// you should not use this constructor, since it may lead to an early-client race condition.
|
||||
/// Instead, use the parameterless constructor, configure, then call <see cref="StartAccepting(IPAddress, int)"/>.
|
||||
/// </summary>
|
||||
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
|
||||
/// <param name="port">The port on which to listen for incoming connection attempts.</param>
|
||||
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
|
||||
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
|
||||
public TcpRpcServer(IPAddress localAddr, int port)
|
||||
/// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
|
||||
public TcpRpcServer(IPAddress localAddr, int port): this()
|
||||
{
|
||||
_rpcEngine = new RpcEngine();
|
||||
_listener = new TcpListener(localAddr, port);
|
||||
_listener.ExclusiveAddressUse = false;
|
||||
StartAccepting(localAddr, port);
|
||||
}
|
||||
|
||||
for (int retry = 0; retry < 5; retry++)
|
||||
/// <summary>
|
||||
/// Starts listening to the specified TCP/IP endpoint and accepting clients.
|
||||
/// </summary>
|
||||
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
|
||||
/// <param name="port">The port on which to listen for incoming connection attempts.</param>
|
||||
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
|
||||
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
|
||||
/// <exception cref="InvalidOperationException">Listening activity was already started</exception>
|
||||
/// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
|
||||
public void StartAccepting(IPAddress localAddr, int port)
|
||||
{
|
||||
if (_listener != null)
|
||||
throw new InvalidOperationException("Listening activity was already started");
|
||||
|
||||
var listener = new TcpListener(localAddr, port)
|
||||
{
|
||||
ExclusiveAddressUse = false
|
||||
};
|
||||
|
||||
int attempt = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
_listener.Start();
|
||||
listener.Start();
|
||||
break;
|
||||
}
|
||||
catch (SocketException socketException)
|
||||
{
|
||||
Logger.LogWarning($"Failed to listen on port {port}, attempt {retry}: {socketException}");
|
||||
Thread.Sleep(10);
|
||||
if (attempt == 5)
|
||||
throw;
|
||||
|
||||
Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}");
|
||||
}
|
||||
|
||||
++attempt;
|
||||
Thread.Sleep(10);
|
||||
}
|
||||
|
||||
_acceptorThread = new Thread(AcceptClients);
|
||||
|
||||
_acceptorThread = new Thread(() => AcceptClients(listener));
|
||||
_listener = listener;
|
||||
_acceptorThread.Start();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Whether the thread which is responsible for acception incoming attempts is still alive.
|
||||
/// The thread will die upon disposal, but also in case of a socket error condition.
|
||||
/// The thread will die after calling <see cref="StopListening"/>, upon disposal, but also in case of a socket error condition.
|
||||
/// Errors which occur on a particular connection will just close that connection and won't interfere
|
||||
/// with the acceptor thread.
|
||||
/// </summary>
|
||||
public bool IsAlive => _acceptorThread.IsAlive;
|
||||
public bool IsAlive => _acceptorThread?.IsAlive ?? false;
|
||||
|
||||
/// <summary>
|
||||
/// Sets the bootstrap capability. It must be an object which implements a valid capability interface
|
||||
|
@ -49,7 +49,7 @@ namespace Capnp
|
||||
/// <param name="defaultSegmentSize">Default size (in words) of a newly allocated segment. If a single allocation requires
|
||||
/// a bigger size, a bigger dedicated segment will be allocated. On the wire, segments will be truncated to their actual
|
||||
/// occupancies.</param>
|
||||
public SegmentAllocator(int defaultSegmentSize = 128)
|
||||
public SegmentAllocator(int defaultSegmentSize = 64)
|
||||
{
|
||||
_defaultSegmentSize = defaultSegmentSize;
|
||||
}
|
||||
|
96
Capnp.Net.Runtime/Util/DuplexBufferedStream.cs
Normal file
96
Capnp.Net.Runtime/Util/DuplexBufferedStream.cs
Normal file
@ -0,0 +1,96 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
|
||||
namespace Capnp.Util
|
||||
{
|
||||
internal class DuplexBufferedStream : Stream
|
||||
{
|
||||
// A buffer size of 1024 bytes seems to be a good comprise, giving good performance
|
||||
// in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes.
|
||||
const int DefaultBufferSize = 1024;
|
||||
|
||||
readonly BufferedStream _readStream;
|
||||
readonly BufferedStream _writeStream;
|
||||
readonly int _bufferSize;
|
||||
readonly object _reentrancyBlocker = new object();
|
||||
|
||||
public DuplexBufferedStream(Stream stream, int bufferSize)
|
||||
{
|
||||
_readStream = new BufferedStream(stream, bufferSize);
|
||||
_writeStream = new BufferedStream(stream, bufferSize);
|
||||
_bufferSize = bufferSize;
|
||||
}
|
||||
|
||||
public DuplexBufferedStream(Stream stream): this(stream, DefaultBufferSize)
|
||||
{
|
||||
}
|
||||
|
||||
public override bool CanRead => true;
|
||||
|
||||
public override bool CanSeek => false;
|
||||
|
||||
public override bool CanWrite => true;
|
||||
|
||||
public override long Length => 0;
|
||||
|
||||
public override long Position
|
||||
{
|
||||
get => 0;
|
||||
set => throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
_writeStream.Flush();
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
return _readStream.Read(buffer, offset, count);
|
||||
}
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override void SetLength(long value)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
if (buffer.Length > _bufferSize) // avoid moiré-like timing effects
|
||||
_writeStream.Flush();
|
||||
|
||||
_writeStream.Write(buffer, offset, count);
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
lock (_reentrancyBlocker)
|
||||
{
|
||||
try
|
||||
{
|
||||
_readStream.Dispose();
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
try
|
||||
{
|
||||
_writeStream.Dispose();
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user