diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index e40a359..ec53112 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/CapnpBench.sln b/Benchmarking/CapnpBench.sln index 5d814fc..700be3a 100644 --- a/Benchmarking/CapnpBench.sln +++ b/Benchmarking/CapnpBench.sln @@ -3,13 +3,11 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 16 VisualStudioVersion = 16.0.29728.190 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EchoServiceGrpc", "EchoServiceGrpc\EchoServiceGrpc.csproj", "{D59C7B71-3887-426B-A636-2DBDA0549817}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmark", "Benchmark\Benchmark.csproj", "{7F7580CA-CCF0-4650-87BF-502D51A8F435}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EchoServiceCapnp", "EchoServiceCapnp\EchoServiceCapnp.csproj", "{309A4A26-F29E-4F49-AB49-76BAE0FD7D62}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EchoServiceGrpc2", "EchoServiceGrpc2\EchoServiceGrpc2.csproj", "{C9CEE2AD-AC6F-4CBD-A83D-2784832C1E37}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EchoServiceGrpc", "EchoServiceGrpc\EchoServiceGrpc.csproj", "{C9CEE2AD-AC6F-4CBD-A83D-2784832C1E37}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -17,10 +15,6 @@ Global Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {D59C7B71-3887-426B-A636-2DBDA0549817}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {D59C7B71-3887-426B-A636-2DBDA0549817}.Debug|Any CPU.Build.0 = Debug|Any CPU - {D59C7B71-3887-426B-A636-2DBDA0549817}.Release|Any CPU.ActiveCfg = Release|Any CPU - {D59C7B71-3887-426B-A636-2DBDA0549817}.Release|Any CPU.Build.0 = Release|Any CPU {7F7580CA-CCF0-4650-87BF-502D51A8F435}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {7F7580CA-CCF0-4650-87BF-502D51A8F435}.Debug|Any CPU.Build.0 = Debug|Any CPU {7F7580CA-CCF0-4650-87BF-502D51A8F435}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/Benchmarking/CapnpProfile/CapnpProfile.csproj b/Benchmarking/CapnpProfile/CapnpProfile.csproj index 3f857cb..9ffe4d3 100644 --- a/Benchmarking/CapnpProfile/CapnpProfile.csproj +++ b/Benchmarking/CapnpProfile/CapnpProfile.csproj @@ -9,11 +9,11 @@ full true x64 - SOTASK_PERF + - + diff --git a/Benchmarking/CapnpProfile/EnginePair.cs b/Benchmarking/CapnpProfile/EnginePair.cs new file mode 100644 index 0000000..815e12a --- /dev/null +++ b/Benchmarking/CapnpProfile/EnginePair.cs @@ -0,0 +1,63 @@ +using Capnp; +using Capnp.Rpc; +using System.Collections.Generic; + +namespace CapnpProfile +{ + class EnginePair + { + class EngineChannel : IEndpoint + { + readonly Queue _frameBuffer = new Queue(); + bool _dismissed; + + public EngineChannel() + { + } + + public RpcEngine.RpcEndpoint OtherEndpoint { get; set; } + public bool HasBufferedFrames => _frameBuffer.Count > 0; + public int FrameCounter { get; private set; } + + + public void Dismiss() + { + if (!_dismissed) + { + _dismissed = true; + OtherEndpoint.Dismiss(); + } + } + + public void Forward(WireFrame frame) + { + if (_dismissed) + return; + + OtherEndpoint.Forward(frame); + } + } + + readonly EngineChannel _channel1, _channel2; + + public RpcEngine Engine1 { get; } + public RpcEngine Engine2 { get; } + public RpcEngine.RpcEndpoint Endpoint1 { get; } + public RpcEngine.RpcEndpoint Endpoint2 { get; } + + public EnginePair() + { + Engine1 = new RpcEngine(); + Engine2 = new RpcEngine(); + _channel1 = new EngineChannel(); + Endpoint1 = Engine1.AddEndpoint(_channel1); + _channel2 = new EngineChannel(); + Endpoint2 = Engine2.AddEndpoint(_channel2); + _channel1.OtherEndpoint = Endpoint2; + _channel2.OtherEndpoint = Endpoint1; + } + + public int Channel1SendCount => _channel1.FrameCounter; + public int Channel2SendCount => _channel2.FrameCounter; + } +} diff --git a/Benchmarking/CapnpProfile/Program.cs b/Benchmarking/CapnpProfile/Program.cs index f2ddd00..97a28ce 100644 --- a/Benchmarking/CapnpProfile/Program.cs +++ b/Benchmarking/CapnpProfile/Program.cs @@ -7,17 +7,11 @@ using System.Threading.Tasks; namespace CapnpProfile { + class Program { - static async Task Main(string[] args) + static async Task Run(IEchoer echoer) { - using var server = new TcpRpcServer(); - server.Main = new CapnpEchoService(); - server.AddBuffering(); - server.StartAccepting(IPAddress.Any, 5002); - using var client = new TcpRpcClient("localhost", 5002); - await client.WhenConnected; - using var echoer = client.GetMain(); var payload = new byte[20]; new Random().NextBytes(payload); @@ -32,7 +26,7 @@ namespace CapnpProfile throw new InvalidOperationException("Echo server malfunction"); #if SOTASK_PERF - if (++counter == 1000) + if (++counter == 10000) { counter = 0; @@ -44,5 +38,32 @@ namespace CapnpProfile #endif } } + + static async Task Main(string[] args) + { + + if (args.Length > 0) + { + var pair = new EnginePair(); + pair.Engine1.Main = new CapnpEchoService(); + var echoer = (CapabilityReflection.CreateProxy(pair.Endpoint2.QueryMain()) as IEchoer); + + await Run(echoer); + } + else + { + using var server = new TcpRpcServer(); + server.Main = new CapnpEchoService(); + server.AddBuffering(); + server.StartAccepting(IPAddress.Any, 5002); + using var client = new TcpRpcClient(); + client.AddBuffering(); + client.Connect("localhost", 5002); + await client.WhenConnected; + using var echoer = client.GetMain(); + + await Run(echoer); + } + } } } diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index 99eff10..ebbd9c6 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + diff --git a/Benchmarking/EchoServiceGrpc/EchoServiceGrpc.csproj b/Benchmarking/EchoServiceGrpc/EchoServiceGrpc.csproj index 28b512c..da2e193 100644 --- a/Benchmarking/EchoServiceGrpc/EchoServiceGrpc.csproj +++ b/Benchmarking/EchoServiceGrpc/EchoServiceGrpc.csproj @@ -2,11 +2,12 @@ netcoreapp3.1 - EchoService.Program - + + Protos\Echo.proto + diff --git a/Benchmarking/EchoServiceGrpc/Program.cs b/Benchmarking/EchoServiceGrpc/Program.cs index f24aee5..a3d05d6 100644 --- a/Benchmarking/EchoServiceGrpc/Program.cs +++ b/Benchmarking/EchoServiceGrpc/Program.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Hosting; -namespace EchoService +namespace EchoServiceGrpc2 { public class Program { diff --git a/Benchmarking/EchoServiceGrpc/Protos/Echo.proto b/Benchmarking/EchoServiceGrpc/Protos/Echo.proto deleted file mode 100644 index b1b0c0d..0000000 --- a/Benchmarking/EchoServiceGrpc/Protos/Echo.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto3"; - -service Echoer { - rpc Echo (EchoRequest) returns (EchoReply); -} - -message EchoRequest { - bytes payload = 1; -} - -message EchoReply { - bytes payload = 1; -} \ No newline at end of file diff --git a/Benchmarking/EchoServiceGrpc/Startup.cs b/Benchmarking/EchoServiceGrpc/Startup.cs index b93a112..3679177 100644 --- a/Benchmarking/EchoServiceGrpc/Startup.cs +++ b/Benchmarking/EchoServiceGrpc/Startup.cs @@ -2,13 +2,14 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using EchoService; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -namespace EchoService +namespace EchoServiceGrpc2 { public class Startup { diff --git a/Benchmarking/EchoServiceGrpc/appsettings.Development.json b/Benchmarking/EchoServiceGrpc/appsettings.Development.json index 1ca5366..fe20c40 100644 --- a/Benchmarking/EchoServiceGrpc/appsettings.Development.json +++ b/Benchmarking/EchoServiceGrpc/appsettings.Development.json @@ -1,10 +1,10 @@ { "Logging": { "LogLevel": { - "Default": "Warning", - "System": "Warning", - "Grpc": "Warning", - "Microsoft": "Warning" + "Default": "Debug", + "System": "Information", + "Grpc": "Information", + "Microsoft": "Information" } } } diff --git a/Benchmarking/EchoServiceGrpc2/EchoServiceGrpc.csproj b/Benchmarking/EchoServiceGrpc2/EchoServiceGrpc.csproj deleted file mode 100644 index 3d293a8..0000000 --- a/Benchmarking/EchoServiceGrpc2/EchoServiceGrpc.csproj +++ /dev/null @@ -1,15 +0,0 @@ - - - - netcoreapp3.1 - - - - - - - - - - - diff --git a/Benchmarking/EchoServiceGrpc2/Program.cs b/Benchmarking/EchoServiceGrpc2/Program.cs deleted file mode 100644 index a3d05d6..0000000 --- a/Benchmarking/EchoServiceGrpc2/Program.cs +++ /dev/null @@ -1,27 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Hosting; - -namespace EchoServiceGrpc2 -{ - public class Program - { - public static void Main(string[] args) - { - CreateHostBuilder(args).Build().Run(); - } - - // Additional configuration is required to successfully run gRPC on macOS. - // For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682 - public static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .ConfigureWebHostDefaults(webBuilder => - { - webBuilder.UseStartup(); - }); - } -} diff --git a/Benchmarking/EchoServiceGrpc2/Services/GrpcEchoService.cs b/Benchmarking/EchoServiceGrpc2/Services/GrpcEchoService.cs deleted file mode 100644 index 71a7ef9..0000000 --- a/Benchmarking/EchoServiceGrpc2/Services/GrpcEchoService.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Grpc.Core; -using Microsoft.Extensions.Logging; - -namespace EchoService -{ - public class GrpcEchoService : Echoer.EchoerBase - { - private readonly ILogger _logger; - public GrpcEchoService(ILogger logger) - { - _logger = logger; - } - - public override Task Echo(EchoRequest request, ServerCallContext context) - { - return Task.FromResult(new EchoReply - { - Payload = request.Payload - }); - } - } -} diff --git a/Benchmarking/EchoServiceGrpc2/Startup.cs b/Benchmarking/EchoServiceGrpc2/Startup.cs deleted file mode 100644 index 3679177..0000000 --- a/Benchmarking/EchoServiceGrpc2/Startup.cs +++ /dev/null @@ -1,44 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using EchoService; -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Http; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; - -namespace EchoServiceGrpc2 -{ - public class Startup - { - // This method gets called by the runtime. Use this method to add services to the container. - // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 - public void ConfigureServices(IServiceCollection services) - { - services.AddGrpc(); - } - - // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. - public void Configure(IApplicationBuilder app, IWebHostEnvironment env) - { - if (env.IsDevelopment()) - { - app.UseDeveloperExceptionPage(); - } - - app.UseRouting(); - - app.UseEndpoints(endpoints => - { - endpoints.MapGrpcService(); - - endpoints.MapGet("/", async context => - { - await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); - }); - }); - } - } -} diff --git a/Benchmarking/EchoServiceGrpc2/appsettings.Development.json b/Benchmarking/EchoServiceGrpc2/appsettings.Development.json deleted file mode 100644 index fe20c40..0000000 --- a/Benchmarking/EchoServiceGrpc2/appsettings.Development.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "Logging": { - "LogLevel": { - "Default": "Debug", - "System": "Information", - "Grpc": "Information", - "Microsoft": "Information" - } - } -} diff --git a/Benchmarking/EchoServiceGrpc2/appsettings.json b/Benchmarking/EchoServiceGrpc2/appsettings.json deleted file mode 100644 index 3110458..0000000 --- a/Benchmarking/EchoServiceGrpc2/appsettings.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "Logging": { - "LogLevel": { - "Default": "Warning", - "Microsoft": "Warning", - "Microsoft.Hosting.Lifetime": "Warning" - } - }, - "AllowedHosts": "*", - "Kestrel": { - "EndpointDefaults": { - "Protocols": "Http2" - } - } -} diff --git a/Capnp.Net.Runtime.Tests/Capnp.Net.Runtime.Tests.csproj b/Capnp.Net.Runtime.Tests/Capnp.Net.Runtime.Tests.csproj index 0057a55..8cfe88d 100644 --- a/Capnp.Net.Runtime.Tests/Capnp.Net.Runtime.Tests.csproj +++ b/Capnp.Net.Runtime.Tests/Capnp.Net.Runtime.Tests.csproj @@ -19,7 +19,7 @@ - TRACE;SOTASK_PERF + TRACE diff --git a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj index 0ec7854..f6bcfa4 100644 --- a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj +++ b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj @@ -33,7 +33,7 @@ - SOTASK_PERF + diff --git a/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs index eb6fbae..ca05312 100644 --- a/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs +++ b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs @@ -17,7 +17,7 @@ namespace Capnp.Rpc /// Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize) { - obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s, bufferSize)); + obj.InjectMidlayer(s => new Util.BufferedNetworkStreamAdapter(s, bufferSize)); } /// @@ -27,7 +27,7 @@ namespace Capnp.Rpc /// or public static void AddBuffering(this ISupportsMidlayers obj) { - obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s)); + obj.InjectMidlayer(s => new Util.BufferedNetworkStreamAdapter(s)); } } } diff --git a/Capnp.Net.Runtime/Util/BufferedNetworkStreamAdapter.cs b/Capnp.Net.Runtime/Util/BufferedNetworkStreamAdapter.cs new file mode 100644 index 0000000..f52a4f6 --- /dev/null +++ b/Capnp.Net.Runtime/Util/BufferedNetworkStreamAdapter.cs @@ -0,0 +1,114 @@ +using System; +using System.IO; +using System.Net.Sockets; +using System.Threading; + +namespace Capnp.Util +{ + internal class BufferedNetworkStreamAdapter : Stream + { + // A buffer size of 1024 bytes seems to be a good comprise, giving good performance + // in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes. + const int DefaultBufferSize = 1024; + + readonly BufferedStream _readStream; + readonly NetworkStream _writeStream; + readonly object _reentrancyBlocker = new object(); + Exception? _bufferedException; + + public BufferedNetworkStreamAdapter(Stream stream, int bufferSize) + { + _readStream = new BufferedStream(stream, bufferSize); + _writeStream = stream as NetworkStream ?? throw new ArgumentException("stream argument must be a NetworkStream"); + } + + public BufferedNetworkStreamAdapter(Stream stream) : this(stream, DefaultBufferSize) + { + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => 0; + + public override long Position + { + get => 0; + set => throw new NotSupportedException(); + } + + public override void Flush() + { + _writeStream.Flush(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _readStream.Read(buffer, offset, count); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + void WriteCallback(IAsyncResult ar) + { + try + { + _writeStream.EndWrite(ar); + } + catch (Exception exception) + { + Volatile.Write(ref _bufferedException, exception); + } + } + + public override void Write(byte[] buffer, int offset, int count) + { + var exception = Volatile.Read(ref _bufferedException); + + if (exception != null) + { + Dispose(); + throw exception; + } + + _writeStream.BeginWrite(buffer, offset, count, WriteCallback, null); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + lock (_reentrancyBlocker) + { + try + { + _readStream.Dispose(); + } + catch + { + } + try + { + _writeStream.Dispose(); + } + catch + { + } + } + } + + base.Dispose(disposing); + } + } +}