flush optimization

This commit is contained in:
Christian Köllner 2020-04-22 22:19:16 +02:00
parent 3ef9a60a7f
commit 4f0abaac73
7 changed files with 130 additions and 1 deletions

View File

@ -45,6 +45,11 @@ namespace Capnp.Net.Runtime.Tests
_fromEnginePump.Send(frame);
}
public void Flush()
{
_fromEnginePump.Flush();
}
public WireFrame ReadNextFrame()
{
var frame = _reader.ReadWireFrame();

View File

@ -104,6 +104,10 @@ namespace Capnp.Net.Runtime.Tests
return false;
}
}
void IEndpoint.Flush()
{
}
}
readonly DecisionTree _decisionTree;

View File

@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Sockets;
using System.Runtime.InteropServices;
@ -117,8 +118,21 @@ namespace Capnp
#endif
_writer.Write(bytes);
}
}
}
_writer.Flush();
public void Flush()
{
if (Monitor.TryEnter(_writeLock))
{
try
{
_writer?.Flush();
}
finally
{
Monitor.Exit(_writeLock);
}
}
}

View File

@ -10,6 +10,8 @@
/// </summary>
void Forward(WireFrame frame);
void Flush();
/// <summary>
/// Close this endpoint.
/// </summary>

View File

@ -66,8 +66,59 @@ namespace Capnp.Rpc
Dismissed
}
class FlushContext
{
readonly FlushContext? _prev;
readonly RpcEndpoint _ep;
bool _requested;
public FlushContext(FlushContext? prev, RpcEndpoint ep)
{
_prev = prev;
_ep = ep;
_requested = false;
}
public RpcEndpoint Ep => _ep;
public void Request()
{
_requested = true;
}
public void Remove()
{
_flushRequests.Value = _prev;
if (_requested)
_ep._tx.Flush();
}
}
readonly struct FlushContextKeeper: IDisposable
{
readonly FlushContext _context;
readonly bool _owner;
public FlushContextKeeper(FlushContext context, bool owner)
{
_context = context;
_owner = owner;
}
public void Dispose()
{
if (_owner)
{
_context.Remove();
}
}
}
static readonly ThreadLocal<PendingQuestion?> _deferredCall = new ThreadLocal<PendingQuestion?>();
static readonly ThreadLocal<bool> _canDeferCalls = new ThreadLocal<bool>();
static readonly ThreadLocal<FlushContext?> _flushRequests = new ThreadLocal<FlushContext?>();
ILogger Logger { get; } = Logging.CreateLogger<RpcEndpoint>();
@ -138,6 +189,10 @@ namespace Capnp.Rpc
ProcessFrame(frame);
}
void IEndpoint.Flush()
{
}
/// <summary>
/// Number of frames sent so far
/// </summary>
@ -204,11 +259,37 @@ namespace Capnp.Rpc
}
}
FlushContextKeeper SetupFlushContext()
{
if (_flushRequests.Value?.Ep == this)
{
return new FlushContextKeeper(_flushRequests.Value, false);
}
else
{
_flushRequests.Value = new FlushContext(_flushRequests.Value, this);
return new FlushContextKeeper(_flushRequests.Value, true);
}
}
void RequestFlush()
{
if (_flushRequests.Value?.Ep == this)
{
_flushRequests.Value.Request();
}
else
{
_tx.Flush();
}
}
void Tx(WireFrame frame)
{
try
{
_tx.Forward(frame);
RequestFlush();
Interlocked.Increment(ref _sendCount);
}
catch (System.Exception exception)
@ -235,6 +316,8 @@ namespace Capnp.Rpc
void IRpcEndpoint.Resolve(uint preliminaryId, Skeleton preliminaryCap, Func<ConsumedCapability> resolvedCapGetter)
{
using var fc = SetupFlushContext();
lock (_reentrancyBlocker)
{
if (!_exportTable.TryGetValue(preliminaryId, out var existing) ||
@ -1168,6 +1251,7 @@ namespace Capnp.Rpc
req.Bootstrap!.QuestionId = pendingBootstrap.QuestionId;
Tx(mb.Frame);
_tx.Flush();
var main = new RemoteAnswerCapability(
pendingBootstrap,
@ -1178,6 +1262,7 @@ namespace Capnp.Rpc
void ProcessFrame(WireFrame frame)
{
using var fc = SetupFlushContext();
var dec = DeserializerState.CreateRoot(frame);
var msg = Message.READER.create(dec);
@ -1415,6 +1500,7 @@ namespace Capnp.Rpc
PendingQuestion IRpcEndpoint.BeginQuestion(ConsumedCapability target, SerializerState inParams)
{
using var fc = SetupFlushContext();
var question = AllocateQuestion(target, inParams);
if (_canDeferCalls.Value)
@ -1458,11 +1544,14 @@ namespace Capnp.Rpc
void IRpcEndpoint.Finish(uint questionId)
{
using var fc = SetupFlushContext();
Finish(questionId);
}
void IRpcEndpoint.ReleaseImport(uint importId)
{
using var fc = SetupFlushContext();
bool exists;
int count = 0;
@ -1492,6 +1581,7 @@ namespace Capnp.Rpc
try
{
Tx(mb.Frame);
RequestFlush();
}
catch (RpcException exception)
{
@ -1502,6 +1592,8 @@ namespace Capnp.Rpc
Task IRpcEndpoint.RequestSenderLoopback(Action<MessageTarget.WRITER> describe)
{
using var fc = SetupFlushContext();
(var tcs, uint id) = AllocateDisembargo();
var mb = MessageBuilder.Create();
@ -1520,6 +1612,8 @@ namespace Capnp.Rpc
void IRpcEndpoint.DeleteQuestion(PendingQuestion question)
{
using var fc = SetupFlushContext();
lock (_reentrancyBlocker)
{
if (!_questionTable.Remove(question.QuestionId))

View File

@ -36,6 +36,11 @@ namespace Capnp.Rpc
{
_pump.Send(frame);
}
public void Flush()
{
_pump.Flush();
}
}
readonly RpcEngine _rpcEngine;

View File

@ -57,6 +57,11 @@ namespace Capnp.Rpc
{
_pump.Send(frame);
}
public void Flush()
{
_pump.Flush();
}
}
class Connection: IConnection