diff --git a/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs b/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs
index 1cace55..a48e1a4 100644
--- a/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs
+++ b/Capnp.Net.Runtime.Tests/EdgeCaseHandling.cs
@@ -45,6 +45,11 @@ namespace Capnp.Net.Runtime.Tests
_fromEnginePump.Send(frame);
}
+ public void Flush()
+ {
+ _fromEnginePump.Flush();
+ }
+
public WireFrame ReadNextFrame()
{
var frame = _reader.ReadWireFrame();
diff --git a/Capnp.Net.Runtime.Tests/Util/TestBase.cs b/Capnp.Net.Runtime.Tests/Util/TestBase.cs
index bad16c0..2899428 100644
--- a/Capnp.Net.Runtime.Tests/Util/TestBase.cs
+++ b/Capnp.Net.Runtime.Tests/Util/TestBase.cs
@@ -104,6 +104,10 @@ namespace Capnp.Net.Runtime.Tests
return false;
}
}
+
+ void IEndpoint.Flush()
+ {
+ }
}
readonly DecisionTree _decisionTree;
diff --git a/Capnp.Net.Runtime/FramePump.cs b/Capnp.Net.Runtime/FramePump.cs
index dad87c3..71c611b 100644
--- a/Capnp.Net.Runtime/FramePump.cs
+++ b/Capnp.Net.Runtime/FramePump.cs
@@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.IO;
using System.Net.Sockets;
using System.Runtime.InteropServices;
@@ -117,8 +118,21 @@ namespace Capnp
#endif
_writer.Write(bytes);
}
+ }
+ }
- _writer.Flush();
+ public void Flush()
+ {
+ if (Monitor.TryEnter(_writeLock))
+ {
+ try
+ {
+ _writer?.Flush();
+ }
+ finally
+ {
+ Monitor.Exit(_writeLock);
+ }
}
}
diff --git a/Capnp.Net.Runtime/Rpc/IEndpoint.cs b/Capnp.Net.Runtime/Rpc/IEndpoint.cs
index 267cafa..02aa5d6 100644
--- a/Capnp.Net.Runtime/Rpc/IEndpoint.cs
+++ b/Capnp.Net.Runtime/Rpc/IEndpoint.cs
@@ -10,6 +10,8 @@
///
void Forward(WireFrame frame);
+ void Flush();
+
///
/// Close this endpoint.
///
diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs
index 60cc0ef..787b711 100644
--- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs
+++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs
@@ -66,8 +66,59 @@ namespace Capnp.Rpc
Dismissed
}
+ class FlushContext
+ {
+ readonly FlushContext? _prev;
+ readonly RpcEndpoint _ep;
+ bool _requested;
+
+ public FlushContext(FlushContext? prev, RpcEndpoint ep)
+ {
+ _prev = prev;
+ _ep = ep;
+ _requested = false;
+ }
+
+ public RpcEndpoint Ep => _ep;
+
+ public void Request()
+ {
+ _requested = true;
+ }
+
+ public void Remove()
+ {
+ _flushRequests.Value = _prev;
+
+ if (_requested)
+ _ep._tx.Flush();
+ }
+ }
+
+ readonly struct FlushContextKeeper: IDisposable
+ {
+ readonly FlushContext _context;
+ readonly bool _owner;
+
+ public FlushContextKeeper(FlushContext context, bool owner)
+ {
+ _context = context;
+ _owner = owner;
+ }
+
+ public void Dispose()
+ {
+ if (_owner)
+ {
+ _context.Remove();
+ }
+ }
+
+ }
+
static readonly ThreadLocal _deferredCall = new ThreadLocal();
static readonly ThreadLocal _canDeferCalls = new ThreadLocal();
+ static readonly ThreadLocal _flushRequests = new ThreadLocal();
ILogger Logger { get; } = Logging.CreateLogger();
@@ -138,6 +189,10 @@ namespace Capnp.Rpc
ProcessFrame(frame);
}
+ void IEndpoint.Flush()
+ {
+ }
+
///
/// Number of frames sent so far
///
@@ -204,11 +259,37 @@ namespace Capnp.Rpc
}
}
+ FlushContextKeeper SetupFlushContext()
+ {
+ if (_flushRequests.Value?.Ep == this)
+ {
+ return new FlushContextKeeper(_flushRequests.Value, false);
+ }
+ else
+ {
+ _flushRequests.Value = new FlushContext(_flushRequests.Value, this);
+ return new FlushContextKeeper(_flushRequests.Value, true);
+ }
+ }
+
+ void RequestFlush()
+ {
+ if (_flushRequests.Value?.Ep == this)
+ {
+ _flushRequests.Value.Request();
+ }
+ else
+ {
+ _tx.Flush();
+ }
+ }
+
void Tx(WireFrame frame)
{
try
{
_tx.Forward(frame);
+ RequestFlush();
Interlocked.Increment(ref _sendCount);
}
catch (System.Exception exception)
@@ -235,6 +316,8 @@ namespace Capnp.Rpc
void IRpcEndpoint.Resolve(uint preliminaryId, Skeleton preliminaryCap, Func resolvedCapGetter)
{
+ using var fc = SetupFlushContext();
+
lock (_reentrancyBlocker)
{
if (!_exportTable.TryGetValue(preliminaryId, out var existing) ||
@@ -1168,6 +1251,7 @@ namespace Capnp.Rpc
req.Bootstrap!.QuestionId = pendingBootstrap.QuestionId;
Tx(mb.Frame);
+ _tx.Flush();
var main = new RemoteAnswerCapability(
pendingBootstrap,
@@ -1178,6 +1262,7 @@ namespace Capnp.Rpc
void ProcessFrame(WireFrame frame)
{
+ using var fc = SetupFlushContext();
var dec = DeserializerState.CreateRoot(frame);
var msg = Message.READER.create(dec);
@@ -1415,6 +1500,7 @@ namespace Capnp.Rpc
PendingQuestion IRpcEndpoint.BeginQuestion(ConsumedCapability target, SerializerState inParams)
{
+ using var fc = SetupFlushContext();
var question = AllocateQuestion(target, inParams);
if (_canDeferCalls.Value)
@@ -1458,11 +1544,14 @@ namespace Capnp.Rpc
void IRpcEndpoint.Finish(uint questionId)
{
+ using var fc = SetupFlushContext();
Finish(questionId);
}
void IRpcEndpoint.ReleaseImport(uint importId)
{
+ using var fc = SetupFlushContext();
+
bool exists;
int count = 0;
@@ -1492,6 +1581,7 @@ namespace Capnp.Rpc
try
{
Tx(mb.Frame);
+ RequestFlush();
}
catch (RpcException exception)
{
@@ -1502,6 +1592,8 @@ namespace Capnp.Rpc
Task IRpcEndpoint.RequestSenderLoopback(Action describe)
{
+ using var fc = SetupFlushContext();
+
(var tcs, uint id) = AllocateDisembargo();
var mb = MessageBuilder.Create();
@@ -1520,6 +1612,8 @@ namespace Capnp.Rpc
void IRpcEndpoint.DeleteQuestion(PendingQuestion question)
{
+ using var fc = SetupFlushContext();
+
lock (_reentrancyBlocker)
{
if (!_questionTable.Remove(question.QuestionId))
diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs
index f52d702..216d280 100644
--- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs
+++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs
@@ -36,6 +36,11 @@ namespace Capnp.Rpc
{
_pump.Send(frame);
}
+
+ public void Flush()
+ {
+ _pump.Flush();
+ }
}
readonly RpcEngine _rpcEngine;
diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs
index 150ddd5..758ae3d 100644
--- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs
+++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs
@@ -57,6 +57,11 @@ namespace Capnp.Rpc
{
_pump.Send(frame);
}
+
+ public void Flush()
+ {
+ _pump.Flush();
+ }
}
class Connection: IConnection