introduced BufferedNetworkStreamAdapter - better performance?

This commit is contained in:
Christian Köllner 2020-04-21 21:02:12 +02:00
parent 78a62ddf98
commit db2d6bce2e
21 changed files with 226 additions and 182 deletions

View File

@ -7,7 +7,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.12.0" /> <PackageReference Include="BenchmarkDotNet" Version="0.12.0" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.88-g218ad92525" /> <PackageReference Include="Capnp.Net.Runtime" Version="1.3.90-g65e87e5aa9" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" /> <PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
<PackageReference Include="Google.Protobuf" Version="3.11.3" /> <PackageReference Include="Google.Protobuf" Version="3.11.3" />
<PackageReference Include="Grpc.Net.Client" Version="2.27.0" /> <PackageReference Include="Grpc.Net.Client" Version="2.27.0" />

View File

@ -3,13 +3,11 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16 # Visual Studio Version 16
VisualStudioVersion = 16.0.29728.190 VisualStudioVersion = 16.0.29728.190
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EchoServiceGrpc", "EchoServiceGrpc\EchoServiceGrpc.csproj", "{D59C7B71-3887-426B-A636-2DBDA0549817}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmark", "Benchmark\Benchmark.csproj", "{7F7580CA-CCF0-4650-87BF-502D51A8F435}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmark", "Benchmark\Benchmark.csproj", "{7F7580CA-CCF0-4650-87BF-502D51A8F435}"
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EchoServiceCapnp", "EchoServiceCapnp\EchoServiceCapnp.csproj", "{309A4A26-F29E-4F49-AB49-76BAE0FD7D62}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EchoServiceCapnp", "EchoServiceCapnp\EchoServiceCapnp.csproj", "{309A4A26-F29E-4F49-AB49-76BAE0FD7D62}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EchoServiceGrpc2", "EchoServiceGrpc2\EchoServiceGrpc2.csproj", "{C9CEE2AD-AC6F-4CBD-A83D-2784832C1E37}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EchoServiceGrpc", "EchoServiceGrpc\EchoServiceGrpc.csproj", "{C9CEE2AD-AC6F-4CBD-A83D-2784832C1E37}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -17,10 +15,6 @@ Global
Release|Any CPU = Release|Any CPU Release|Any CPU = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution GlobalSection(ProjectConfigurationPlatforms) = postSolution
{D59C7B71-3887-426B-A636-2DBDA0549817}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D59C7B71-3887-426B-A636-2DBDA0549817}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D59C7B71-3887-426B-A636-2DBDA0549817}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D59C7B71-3887-426B-A636-2DBDA0549817}.Release|Any CPU.Build.0 = Release|Any CPU
{7F7580CA-CCF0-4650-87BF-502D51A8F435}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {7F7580CA-CCF0-4650-87BF-502D51A8F435}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7F7580CA-CCF0-4650-87BF-502D51A8F435}.Debug|Any CPU.Build.0 = Debug|Any CPU {7F7580CA-CCF0-4650-87BF-502D51A8F435}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7F7580CA-CCF0-4650-87BF-502D51A8F435}.Release|Any CPU.ActiveCfg = Release|Any CPU {7F7580CA-CCF0-4650-87BF-502D51A8F435}.Release|Any CPU.ActiveCfg = Release|Any CPU

View File

@ -9,11 +9,11 @@
<DebugType>full</DebugType> <DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols> <DebugSymbols>true</DebugSymbols>
<PlatformTarget>x64</PlatformTarget> <PlatformTarget>x64</PlatformTarget>
<DefineConstants>SOTASK_PERF</DefineConstants> <DefineConstants></DefineConstants>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.89-gdebe76be89" /> <PackageReference Include="Capnp.Net.Runtime" Version="1.3.90-g65e87e5aa9" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.3.90-g65e87e5aa9" /> <PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.3.90-g65e87e5aa9" />
</ItemGroup> </ItemGroup>

View File

@ -0,0 +1,63 @@
using Capnp;
using Capnp.Rpc;
using System.Collections.Generic;
namespace CapnpProfile
{
class EnginePair
{
class EngineChannel : IEndpoint
{
readonly Queue<WireFrame> _frameBuffer = new Queue<WireFrame>();
bool _dismissed;
public EngineChannel()
{
}
public RpcEngine.RpcEndpoint OtherEndpoint { get; set; }
public bool HasBufferedFrames => _frameBuffer.Count > 0;
public int FrameCounter { get; private set; }
public void Dismiss()
{
if (!_dismissed)
{
_dismissed = true;
OtherEndpoint.Dismiss();
}
}
public void Forward(WireFrame frame)
{
if (_dismissed)
return;
OtherEndpoint.Forward(frame);
}
}
readonly EngineChannel _channel1, _channel2;
public RpcEngine Engine1 { get; }
public RpcEngine Engine2 { get; }
public RpcEngine.RpcEndpoint Endpoint1 { get; }
public RpcEngine.RpcEndpoint Endpoint2 { get; }
public EnginePair()
{
Engine1 = new RpcEngine();
Engine2 = new RpcEngine();
_channel1 = new EngineChannel();
Endpoint1 = Engine1.AddEndpoint(_channel1);
_channel2 = new EngineChannel();
Endpoint2 = Engine2.AddEndpoint(_channel2);
_channel1.OtherEndpoint = Endpoint2;
_channel2.OtherEndpoint = Endpoint1;
}
public int Channel1SendCount => _channel1.FrameCounter;
public int Channel2SendCount => _channel2.FrameCounter;
}
}

View File

@ -7,17 +7,11 @@ using System.Threading.Tasks;
namespace CapnpProfile namespace CapnpProfile
{ {
class Program class Program
{ {
static async Task Main(string[] args) static async Task Run(IEchoer echoer)
{ {
using var server = new TcpRpcServer();
server.Main = new CapnpEchoService();
server.AddBuffering();
server.StartAccepting(IPAddress.Any, 5002);
using var client = new TcpRpcClient("localhost", 5002);
await client.WhenConnected;
using var echoer = client.GetMain<IEchoer>();
var payload = new byte[20]; var payload = new byte[20];
new Random().NextBytes(payload); new Random().NextBytes(payload);
@ -32,7 +26,7 @@ namespace CapnpProfile
throw new InvalidOperationException("Echo server malfunction"); throw new InvalidOperationException("Echo server malfunction");
#if SOTASK_PERF #if SOTASK_PERF
if (++counter == 1000) if (++counter == 10000)
{ {
counter = 0; counter = 0;
@ -44,5 +38,32 @@ namespace CapnpProfile
#endif #endif
} }
} }
static async Task Main(string[] args)
{
if (args.Length > 0)
{
var pair = new EnginePair();
pair.Engine1.Main = new CapnpEchoService();
var echoer = (CapabilityReflection.CreateProxy<IEchoer>(pair.Endpoint2.QueryMain()) as IEchoer);
await Run(echoer);
}
else
{
using var server = new TcpRpcServer();
server.Main = new CapnpEchoService();
server.AddBuffering();
server.StartAccepting(IPAddress.Any, 5002);
using var client = new TcpRpcClient();
client.AddBuffering();
client.Connect("localhost", 5002);
await client.WhenConnected;
using var echoer = client.GetMain<IEchoer>();
await Run(echoer);
}
}
} }
} }

View File

@ -6,7 +6,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.88-g218ad92525" /> <PackageReference Include="Capnp.Net.Runtime" Version="1.3.90-g65e87e5aa9" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" /> <PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
</ItemGroup> </ItemGroup>

View File

@ -2,11 +2,12 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework> <TargetFramework>netcoreapp3.1</TargetFramework>
<StartupObject>EchoService.Program</StartupObject>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Protobuf Include="Protos\Echo.proto" GrpcServices="Server" /> <Protobuf Include="..\Benchmark\Protos\Echo.proto" GrpcServices="Server">
<Link>Protos\Echo.proto</Link>
</Protobuf>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -6,7 +6,7 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
namespace EchoService namespace EchoServiceGrpc2
{ {
public class Program public class Program
{ {

View File

@ -1,13 +0,0 @@
syntax = "proto3";
service Echoer {
rpc Echo (EchoRequest) returns (EchoReply);
}
message EchoRequest {
bytes payload = 1;
}
message EchoReply {
bytes payload = 1;
}

View File

@ -2,13 +2,14 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using EchoService;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
namespace EchoService namespace EchoServiceGrpc2
{ {
public class Startup public class Startup
{ {

View File

@ -1,10 +1,10 @@
{ {
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
"Default": "Warning", "Default": "Debug",
"System": "Warning", "System": "Information",
"Grpc": "Warning", "Grpc": "Information",
"Microsoft": "Warning" "Microsoft": "Information"
} }
} }
} }

View File

@ -1,15 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<Protobuf Include="Protos\greet.proto" GrpcServices="Server" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.24.0" />
</ItemGroup>
</Project>

View File

@ -1,27 +0,0 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
namespace EchoServiceGrpc2
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
// Additional configuration is required to successfully run gRPC on macOS.
// For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}

View File

@ -1,26 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
namespace EchoService
{
public class GrpcEchoService : Echoer.EchoerBase
{
private readonly ILogger<GrpcEchoService> _logger;
public GrpcEchoService(ILogger<GrpcEchoService> logger)
{
_logger = logger;
}
public override Task<EchoReply> Echo(EchoRequest request, ServerCallContext context)
{
return Task.FromResult(new EchoReply
{
Payload = request.Payload
});
}
}
}

View File

@ -1,44 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using EchoService;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace EchoServiceGrpc2
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddGrpc();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<GrpcEchoService>();
endpoints.MapGet("/", async context =>
{
await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
});
});
}
}
}

View File

@ -1,10 +0,0 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"System": "Information",
"Grpc": "Information",
"Microsoft": "Information"
}
}
}

View File

@ -1,15 +0,0 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Warning"
}
},
"AllowedHosts": "*",
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
}
}
}

View File

@ -19,7 +19,7 @@
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netcoreapp2.1|AnyCPU'"> <PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netcoreapp2.1|AnyCPU'">
<DefineConstants>TRACE;SOTASK_PERF</DefineConstants> <DefineConstants>TRACE</DefineConstants>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -33,7 +33,7 @@
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
<DefineConstants>SOTASK_PERF</DefineConstants> <DefineConstants></DefineConstants>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netstandard2.0|AnyCPU'"> <PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netstandard2.0|AnyCPU'">

View File

@ -17,7 +17,7 @@ namespace Capnp.Rpc
/// <param name="bufferSize">Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size</param> /// <param name="bufferSize">Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size</param>
public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize) public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize)
{ {
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s, bufferSize)); obj.InjectMidlayer(s => new Util.BufferedNetworkStreamAdapter(s, bufferSize));
} }
/// <summary> /// <summary>
@ -27,7 +27,7 @@ namespace Capnp.Rpc
/// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param> /// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param>
public static void AddBuffering(this ISupportsMidlayers obj) public static void AddBuffering(this ISupportsMidlayers obj)
{ {
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s)); obj.InjectMidlayer(s => new Util.BufferedNetworkStreamAdapter(s));
} }
} }
} }

View File

@ -0,0 +1,114 @@
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
namespace Capnp.Util
{
internal class BufferedNetworkStreamAdapter : Stream
{
// A buffer size of 1024 bytes seems to be a good comprise, giving good performance
// in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes.
const int DefaultBufferSize = 1024;
readonly BufferedStream _readStream;
readonly NetworkStream _writeStream;
readonly object _reentrancyBlocker = new object();
Exception? _bufferedException;
public BufferedNetworkStreamAdapter(Stream stream, int bufferSize)
{
_readStream = new BufferedStream(stream, bufferSize);
_writeStream = stream as NetworkStream ?? throw new ArgumentException("stream argument must be a NetworkStream");
}
public BufferedNetworkStreamAdapter(Stream stream) : this(stream, DefaultBufferSize)
{
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => 0;
public override long Position
{
get => 0;
set => throw new NotSupportedException();
}
public override void Flush()
{
_writeStream.Flush();
}
public override int Read(byte[] buffer, int offset, int count)
{
return _readStream.Read(buffer, offset, count);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
void WriteCallback(IAsyncResult ar)
{
try
{
_writeStream.EndWrite(ar);
}
catch (Exception exception)
{
Volatile.Write(ref _bufferedException, exception);
}
}
public override void Write(byte[] buffer, int offset, int count)
{
var exception = Volatile.Read(ref _bufferedException);
if (exception != null)
{
Dispose();
throw exception;
}
_writeStream.BeginWrite(buffer, offset, count, WriteCallback, null);
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
lock (_reentrancyBlocker)
{
try
{
_readStream.Dispose();
}
catch
{
}
try
{
_writeStream.Dispose();
}
catch
{
}
}
}
base.Dispose(disposing);
}
}
}