From 4f0abaac735efa8f52ac7ef9b3251846d962862c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Wed, 22 Apr 2020 22:19:16 +0200 Subject: [PATCH] flush optimization --- Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs | 5 ++ Capnp.Net.Runtime.Tests/Util/TestBase.cs | 4 + Capnp.Net.Runtime/FramePump.cs | 16 +++- Capnp.Net.Runtime/Rpc/IEndpoint.cs | 2 + Capnp.Net.Runtime/Rpc/RpcEngine.cs | 94 +++++++++++++++++++++ Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 5 ++ Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 5 ++ 7 files changed, 130 insertions(+), 1 deletion(-) diff --git a/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs b/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs index 1cace55..a48e1a4 100644 --- a/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs +++ b/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs @@ -45,6 +45,11 @@ namespace Capnp.Net.Runtime.Tests _fromEnginePump.Send(frame); } + public void Flush() + { + _fromEnginePump.Flush(); + } + public WireFrame ReadNextFrame() { var frame = _reader.ReadWireFrame(); diff --git a/Capnp.Net.Runtime.Tests/Util/TestBase.cs b/Capnp.Net.Runtime.Tests/Util/TestBase.cs index bad16c0..2899428 100644 --- a/Capnp.Net.Runtime.Tests/Util/TestBase.cs +++ b/Capnp.Net.Runtime.Tests/Util/TestBase.cs @@ -104,6 +104,10 @@ namespace Capnp.Net.Runtime.Tests return false; } } + + void IEndpoint.Flush() + { + } } readonly DecisionTree _decisionTree; diff --git a/Capnp.Net.Runtime/FramePump.cs b/Capnp.Net.Runtime/FramePump.cs index dad87c3..71c611b 100644 --- a/Capnp.Net.Runtime/FramePump.cs +++ b/Capnp.Net.Runtime/FramePump.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Net.Sockets; using System.Runtime.InteropServices; @@ -117,8 +118,21 @@ namespace Capnp #endif _writer.Write(bytes); } + } + } - _writer.Flush(); + public void Flush() + { + if (Monitor.TryEnter(_writeLock)) + { + try + { + _writer?.Flush(); + } + finally + { + Monitor.Exit(_writeLock); + } } } diff --git a/Capnp.Net.Runtime/Rpc/IEndpoint.cs b/Capnp.Net.Runtime/Rpc/IEndpoint.cs index 267cafa..02aa5d6 100644 --- a/Capnp.Net.Runtime/Rpc/IEndpoint.cs +++ b/Capnp.Net.Runtime/Rpc/IEndpoint.cs @@ -10,6 +10,8 @@ /// void Forward(WireFrame frame); + void Flush(); + /// /// Close this endpoint. /// diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs index 60cc0ef..787b711 100644 --- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs +++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs @@ -66,8 +66,59 @@ namespace Capnp.Rpc Dismissed } + class FlushContext + { + readonly FlushContext? _prev; + readonly RpcEndpoint _ep; + bool _requested; + + public FlushContext(FlushContext? prev, RpcEndpoint ep) + { + _prev = prev; + _ep = ep; + _requested = false; + } + + public RpcEndpoint Ep => _ep; + + public void Request() + { + _requested = true; + } + + public void Remove() + { + _flushRequests.Value = _prev; + + if (_requested) + _ep._tx.Flush(); + } + } + + readonly struct FlushContextKeeper: IDisposable + { + readonly FlushContext _context; + readonly bool _owner; + + public FlushContextKeeper(FlushContext context, bool owner) + { + _context = context; + _owner = owner; + } + + public void Dispose() + { + if (_owner) + { + _context.Remove(); + } + } + + } + static readonly ThreadLocal _deferredCall = new ThreadLocal(); static readonly ThreadLocal _canDeferCalls = new ThreadLocal(); + static readonly ThreadLocal _flushRequests = new ThreadLocal(); ILogger Logger { get; } = Logging.CreateLogger(); @@ -138,6 +189,10 @@ namespace Capnp.Rpc ProcessFrame(frame); } + void IEndpoint.Flush() + { + } + /// /// Number of frames sent so far /// @@ -204,11 +259,37 @@ namespace Capnp.Rpc } } + FlushContextKeeper SetupFlushContext() + { + if (_flushRequests.Value?.Ep == this) + { + return new FlushContextKeeper(_flushRequests.Value, false); + } + else + { + _flushRequests.Value = new FlushContext(_flushRequests.Value, this); + return new FlushContextKeeper(_flushRequests.Value, true); + } + } + + void RequestFlush() + { + if (_flushRequests.Value?.Ep == this) + { + _flushRequests.Value.Request(); + } + else + { + _tx.Flush(); + } + } + void Tx(WireFrame frame) { try { _tx.Forward(frame); + RequestFlush(); Interlocked.Increment(ref _sendCount); } catch (System.Exception exception) @@ -235,6 +316,8 @@ namespace Capnp.Rpc void IRpcEndpoint.Resolve(uint preliminaryId, Skeleton preliminaryCap, Func resolvedCapGetter) { + using var fc = SetupFlushContext(); + lock (_reentrancyBlocker) { if (!_exportTable.TryGetValue(preliminaryId, out var existing) || @@ -1168,6 +1251,7 @@ namespace Capnp.Rpc req.Bootstrap!.QuestionId = pendingBootstrap.QuestionId; Tx(mb.Frame); + _tx.Flush(); var main = new RemoteAnswerCapability( pendingBootstrap, @@ -1178,6 +1262,7 @@ namespace Capnp.Rpc void ProcessFrame(WireFrame frame) { + using var fc = SetupFlushContext(); var dec = DeserializerState.CreateRoot(frame); var msg = Message.READER.create(dec); @@ -1415,6 +1500,7 @@ namespace Capnp.Rpc PendingQuestion IRpcEndpoint.BeginQuestion(ConsumedCapability target, SerializerState inParams) { + using var fc = SetupFlushContext(); var question = AllocateQuestion(target, inParams); if (_canDeferCalls.Value) @@ -1458,11 +1544,14 @@ namespace Capnp.Rpc void IRpcEndpoint.Finish(uint questionId) { + using var fc = SetupFlushContext(); Finish(questionId); } void IRpcEndpoint.ReleaseImport(uint importId) { + using var fc = SetupFlushContext(); + bool exists; int count = 0; @@ -1492,6 +1581,7 @@ namespace Capnp.Rpc try { Tx(mb.Frame); + RequestFlush(); } catch (RpcException exception) { @@ -1502,6 +1592,8 @@ namespace Capnp.Rpc Task IRpcEndpoint.RequestSenderLoopback(Action describe) { + using var fc = SetupFlushContext(); + (var tcs, uint id) = AllocateDisembargo(); var mb = MessageBuilder.Create(); @@ -1520,6 +1612,8 @@ namespace Capnp.Rpc void IRpcEndpoint.DeleteQuestion(PendingQuestion question) { + using var fc = SetupFlushContext(); + lock (_reentrancyBlocker) { if (!_questionTable.Remove(question.QuestionId)) diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index f52d702..216d280 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -36,6 +36,11 @@ namespace Capnp.Rpc { _pump.Send(frame); } + + public void Flush() + { + _pump.Flush(); + } } readonly RpcEngine _rpcEngine; diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 150ddd5..758ae3d 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -57,6 +57,11 @@ namespace Capnp.Rpc { _pump.Send(frame); } + + public void Flush() + { + _pump.Flush(); + } } class Connection: IConnection