114 lines
4.6 KiB
C#
Raw Normal View History

using Capnp.Util;
using Microsoft.Extensions.Logging;
2019-06-12 21:56:55 +02:00
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
abstract class RemoteResolvingCapability : RemoteCapability, IResolvingCapability
{
// Set DebugEmbargos to true to get logging output for calls. RPC calls are expected to
// be on the critical path, hence very relevant for performance. We just can't afford
// additional stuff on this path. Even if the logger filters the outputs away, there is
// overhead for creating the Logger object, calling the Logger methods and deciding to
// filter the output. This justifies the precompiler switch.
#if DebugEmbargos
ILogger Logger { get; } = Logging.CreateLogger<RemoteResolvingCapability>();
#endif
public abstract StrictlyOrderedAwaitTask WhenResolved { get; }
public abstract T? GetResolvedCapability<T>() where T : class;
2019-06-12 21:56:55 +02:00
protected RemoteResolvingCapability(IRpcEndpoint ep) : base(ep)
{
}
protected int _pendingCallsOnPromise;
StrictlyOrderedAwaitTask? _disembargo;
2019-06-12 21:56:55 +02:00
2020-03-10 21:55:34 +01:00
protected abstract ConsumedCapability? ResolvedCap { get; }
2019-06-12 21:56:55 +02:00
protected abstract void GetMessageTarget(MessageTarget.WRITER wr);
protected IPromisedAnswer CallOnResolution(ulong interfaceId, ushort methodId, DynamicSerializerState args)
2019-06-12 21:56:55 +02:00
{
var resolvedCap = ResolvedCap!;
2020-01-11 17:21:31 +01:00
2019-06-12 21:56:55 +02:00
try
{
if (resolvedCap is NullCapability ||
// Must not request disembargo on null cap
resolvedCap is RemoteCapability ||
2020-04-06 21:12:08 +02:00
//# Note that in the case where Carol actually lives in Vat B (i.e., the same vat that the promise
//# already pointed at), no embargo is needed, because the pipelined calls are delivered over the
//# same path as the later direct calls.
2019-06-12 21:56:55 +02:00
2020-04-06 21:12:08 +02:00
(_disembargo == null && _pendingCallsOnPromise == 0) ||
// No embargo is needed since all outstanding replies have returned
2019-06-12 21:56:55 +02:00
2020-04-06 21:12:08 +02:00
_disembargo?.IsCompleted == true
// Disembargo has returned
)
{
#if DebugEmbargos
Logger.LogDebug("Direct call");
#endif
using var proxy = new Proxy(resolvedCap);
2020-04-06 21:12:08 +02:00
return proxy.Call(interfaceId, methodId, args, default);
}
else
{
if (_disembargo == null)
2019-06-12 21:56:55 +02:00
{
#if DebugEmbargos
2020-04-06 21:12:08 +02:00
Logger.LogDebug("Requesting disembargo");
2019-06-12 21:56:55 +02:00
#endif
_disembargo = _ep.RequestSenderLoopback(GetMessageTarget).EnforceAwaitOrder();
2019-06-12 21:56:55 +02:00
}
else
{
#if DebugEmbargos
2020-04-06 21:12:08 +02:00
Logger.LogDebug("Waiting for requested disembargo");
2019-06-12 21:56:55 +02:00
#endif
2020-04-06 21:12:08 +02:00
}
2019-06-12 21:56:55 +02:00
2020-04-06 21:12:08 +02:00
var cancellationTokenSource = new CancellationTokenSource();
2019-06-12 21:56:55 +02:00
async Task<DeserializerState> AwaitAnswer()
2020-04-06 21:12:08 +02:00
{
await _disembargo!;
2020-04-06 21:12:08 +02:00
// Two reasons for ignoring exceptions on the previous task (i.e. not _.Wait()ing):
// 1. A faulting predecessor, especially due to cancellation, must not have any impact on this one.
// 2. A faulting disembargo request would imply that the other side cannot send pending requests anyway.
if (cancellationTokenSource.Token.IsCancellationRequested)
2019-06-12 21:56:55 +02:00
{
2020-04-06 21:12:08 +02:00
args.Dispose();
cancellationTokenSource.Token.ThrowIfCancellationRequested();
}
2019-06-12 21:56:55 +02:00
using var proxy = new Proxy(resolvedCap);
var promisedAnswer = proxy.Call(interfaceId, methodId, args, default);
2019-06-12 21:56:55 +02:00
2020-04-06 21:12:08 +02:00
using (cancellationTokenSource.Token.Register(promisedAnswer.Dispose))
2019-06-12 21:56:55 +02:00
{
2020-04-06 21:12:08 +02:00
return await promisedAnswer.WhenReturned;
2019-06-12 21:56:55 +02:00
}
}
2020-04-06 21:12:08 +02:00
return new LocalAnswer(cancellationTokenSource, AwaitAnswer());
2019-06-12 21:56:55 +02:00
}
}
catch (System.Exception exception)
{
// Wrap exception into local answer, since otherwise we'd get an AggregateException (which we don't want).
return new LocalAnswer(
new CancellationTokenSource(),
Task.FromException<DeserializerState>(exception));
}
}
}
2020-01-11 17:56:12 +01:00
}