From 51ecd00e82a6a1df53b81f19f822e31089c4b485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Wed, 6 Nov 2019 17:58:46 +0100 Subject: [PATCH] Improved cancellation support during interception --- Capnp.Net.Runtime.Tests/Interception.cs | 41 +++++- .../Rpc/Interception/CallContext.cs | 133 +++++++++++++++--- .../Rpc/Interception/IInterceptionPolicy.cs | 13 ++ .../Rpc/Interception/Interceptor.cs | 23 ++- Capnp.Net.Runtime/Rpc/PendingAnswer.cs | 75 ---------- 5 files changed, 191 insertions(+), 94 deletions(-) diff --git a/Capnp.Net.Runtime.Tests/Interception.cs b/Capnp.Net.Runtime.Tests/Interception.cs index cc95e9b..ea7c1f5 100644 --- a/Capnp.Net.Runtime.Tests/Interception.cs +++ b/Capnp.Net.Runtime.Tests/Interception.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; @@ -217,7 +218,7 @@ namespace Capnp.Net.Runtime.Tests } [TestMethod] - public void InterceptClientSideCancelCall() + public void InterceptClientSideCancelReturn() { var policy = new MyPolicy("a"); @@ -235,8 +236,9 @@ namespace Capnp.Net.Runtime.Tests var request1 = main.Foo(321, false, default); Assert.IsTrue(policy.Calls.TryReceive(out var cc)); Assert.IsFalse(request1.IsCompleted); + Assert.IsFalse(cc.CancelFromAlice.IsCancellationRequested); - cc.IsCanceled = true; + cc.ReturnCanceled = true; cc.ReturnToAlice(); @@ -246,6 +248,41 @@ namespace Capnp.Net.Runtime.Tests } } + [TestMethod] + public void InterceptClientSideOverrideCanceledCall() + { + var policy = new MyPolicy("a"); + + (var server, var client) = SetupClientServerPair(); + + using (server) + using (client) + { + client.WhenConnected.Wait(); + + var counters = new Counters(); + server.Main = new TestInterfaceImpl(counters); + using (var main = policy.Attach(client.GetMain())) + { + var request1 = main.Foo(321, false, new CancellationToken(true)); + Assert.IsTrue(policy.Calls.TryReceive(out var cc)); + Assert.IsFalse(request1.IsCompleted); + Assert.IsTrue(cc.CancelFromAlice.IsCancellationRequested); + + cc.ForwardToBob(); + Assert.IsTrue(policy.Returns.ReceiveAsync().Wait(MediumNonDbgTimeout)); + Assert.IsTrue(cc.ReturnCanceled); + cc.ReturnCanceled = false; + cc.Exception = "Cancelled"; + + cc.ReturnToAlice(); + + Assert.IsTrue(request1.IsCompleted); + Assert.IsTrue(request1.IsFaulted); + } + } + } + [TestMethod] public void InterceptClientSideRedirectCall() { diff --git a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs index 2e4b704..645fe5b 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs @@ -16,8 +16,9 @@ namespace Capnp.Rpc.Interception { class PromisedAnswer : IPromisedAnswer { - CallContext _callContext; - TaskCompletionSource _futureResult = new TaskCompletionSource(); + readonly CallContext _callContext; + readonly TaskCompletionSource _futureResult = new TaskCompletionSource(); + readonly CancellationTokenSource _cancelFromAlice = new CancellationTokenSource(); public PromisedAnswer(CallContext callContext) { @@ -25,6 +26,7 @@ namespace Capnp.Rpc.Interception } public Task WhenReturned => _futureResult.Task; + public CancellationToken CancelFromAlice => _cancelFromAlice.Token; async Task AccessWhenReturned(MemberAccessPath access) { @@ -53,33 +55,107 @@ namespace Capnp.Rpc.Interception public void Dispose() { + try + { + _cancelFromAlice.Cancel(); + } + catch (ObjectDisposedException) + { + // May happen when cancellation request from Alice arrives after return. + } } public void Return() { - if (_callContext.IsCanceled) + try { - _futureResult.SetCanceled(); + if (_callContext.ReturnCanceled) + { + _futureResult.SetCanceled(); + } + else if (_callContext.Exception != null) + { + _futureResult.SetException(new RpcException(_callContext.Exception)); + } + else + { + _futureResult.SetResult(_callContext.OutArgs); + } } - else if (_callContext.Exception != null) + finally { - _futureResult.SetException(new RpcException(_callContext.Exception)); - } - else - { - _futureResult.SetResult(_callContext.OutArgs); + _cancelFromAlice.Dispose(); } } } + /// + /// Target interface ID of this call + /// public ulong InterfaceId { get; } + + /// + /// Target method ID of this call + /// public ushort MethodId { get; } - public bool IsTailCall { get; } + + /// + /// Lifecycle state of this call + /// public InterceptionState State { get; private set; } + + /// + /// Input arguments + /// public SerializerState InArgs { get; set; } + + /// + /// Output arguments ("return value") + /// public DeserializerState OutArgs { get; set; } + + /// + /// Exception text, or null if there is no exception + /// public string Exception { get; set; } - public bool IsCanceled { get; set; } + + /// + /// Whether the call should return in canceled state to Alice (the original caller). + /// In case of forwarding () the property is automatically set according + /// to the cancellation state of Bob's answer. However, you may override it: + /// + /// Setting it from 'false' to 'true' means that we pretend Alice a canceled call. + /// If Alice never requested cancellation this will surprise her pretty much. + /// Setting it from 'true' to 'false' overrides an existing cancellation. Since + /// we did not receive any output arguments from Bob (due to the cancellation), you *must* provide + /// either or . + /// + /// + public bool ReturnCanceled { get; set; } + + /// + /// The cancellation token *from Alice* tells us when the original caller resigns from the call. + /// + public CancellationToken CancelFromAlice { get; private set; } + + /// + /// The cancellation token *to Bob* tells the target capability when we resign from the forwarded call. + /// It is initialized with . Override it to achieve different behaviors: + /// E.g. set it to CancellationToken.None for "hiding" any cancellation request from Alice. + /// Set it to new CancellationToken(true) to pretend Bob a cancellation request. + /// + public CancellationToken CancelToBob { get; set; } + + /// + /// Target capability. May be one of the following: + /// + /// Capability interface implementation + /// A -derived object + /// A -derived object + /// A -derived object (low level capability) + /// null + /// + /// public object Bob { get => _bob; @@ -132,7 +208,9 @@ namespace Capnp.Rpc.Interception { _censorCapability = censorCapability; _promisedAnswer = new PromisedAnswer(this); - + + CancelFromAlice = _promisedAnswer.CancelFromAlice; + CancelToBob = CancelFromAlice; Bob = censorCapability.InterceptedCapability; InterfaceId = interfaceId; MethodId = methodId; @@ -164,34 +242,54 @@ namespace Capnp.Rpc.Interception } } + /// + /// Intercepts all capabilies inside the input arguments + /// + /// Policy to use, or null to further use present policy public void InterceptInCaps(IInterceptionPolicy policyOverride = null) { InterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy); } + /// + /// Intercepts all capabilies inside the output arguments + /// + /// Policy to use, or null to further use present policy public void InterceptOutCaps(IInterceptionPolicy policyOverride = null) { InterceptCaps(OutArgs, policyOverride ?? _censorCapability.Policy); } + /// + /// Unintercepts all capabilies inside the input arguments + /// + /// Policy to remove, or null to remove present policy public void UninterceptInCaps(IInterceptionPolicy policyOverride = null) { UninterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy); } + /// + /// Unintercepts all capabilies inside the output arguments + /// + /// Policy to remove, or null to remove present policy public void UninterceptOutCaps(IInterceptionPolicy policyOverride = null) { UninterceptCaps(OutArgs, policyOverride ?? _censorCapability.Policy); } - public void ForwardToBob(CancellationToken cancellationToken = default) + /// + /// Forwards this intercepted call to the target capability ("Bob"). + /// + /// Optional cancellation token, requesting Bob to cancel the call + public void ForwardToBob() { if (Bob == null) { throw new InvalidOperationException("Bob is null"); } - var answer = BobProxy.Call(InterfaceId, MethodId, InArgs.Rewrap(), IsTailCall, cancellationToken); + var answer = BobProxy.Call(InterfaceId, MethodId, InArgs.Rewrap(), false, CancelToBob); State = InterceptionState.ForwardedToBob; @@ -205,7 +303,7 @@ namespace Capnp.Rpc.Interception } catch (TaskCanceledException) { - IsCanceled = true; + ReturnCanceled = true; } catch (System.Exception exception) { @@ -221,6 +319,9 @@ namespace Capnp.Rpc.Interception ChangeStateWhenReturned(); } + /// + /// Returns this intercepted call to the caller ("Alice"). + /// public void ReturnToAlice() { try diff --git a/Capnp.Net.Runtime/Rpc/Interception/IInterceptionPolicy.cs b/Capnp.Net.Runtime/Rpc/Interception/IInterceptionPolicy.cs index 27bc284..a4d5706 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/IInterceptionPolicy.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/IInterceptionPolicy.cs @@ -2,9 +2,22 @@ namespace Capnp.Rpc.Interception { + /// + /// An interception policy implements callbacks for outgoing calls and returning forwarded calls. + /// public interface IInterceptionPolicy: IEquatable { + /// + /// A caller ("Alice") initiated a new call, which is now intercepted. + /// + /// Context object void OnCallFromAlice(CallContext callContext); + + /// + /// Given that the intercepted call was forwarded, it returned now from the target ("Bob") + /// and may (or may not) be returned to the original caller ("Alice"). + /// + /// void OnReturnFromBob(CallContext callContext); } } diff --git a/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs b/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs index 7c01120..91c3ee3 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs @@ -4,11 +4,23 @@ using System.Runtime.CompilerServices; namespace Capnp.Rpc.Interception { + /// + /// This static class provides extension methods for intercepting and unintercepting capabilities. + /// public static class Interceptor { static readonly ConditionalWeakTable _interceptMap = new ConditionalWeakTable(); - + + /// + /// Attach this policy to given capability. + /// + /// Capability interface type + /// Policy to attach + /// Capability to censor + /// Censored capability instance + /// is null or + /// is null public static TCap Attach(this IInterceptionPolicy policy, TCap cap) where TCap: class { @@ -46,6 +58,15 @@ namespace Capnp.Rpc.Interception } } + /// + /// Detach this policy from given (censored) capability. + /// + /// Capability interface type + /// Policy to detach + /// Capability to clean + /// Clean capability instance (at least, without this interception policy) + /// is null or + /// is null public static TCap Detach(this IInterceptionPolicy policy, TCap cap) where TCap: class { diff --git a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs index 83bdcd6..4031501 100644 --- a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs @@ -163,83 +163,8 @@ namespace Capnp.Rpc }); } - //Task ChainedAwaitWhenReady() - //{ - // async Task AwaitChainedTask(Task chainedTask) - // { - // await chainedTask; - // return _callTask.Result; - // } - - // Task resultTask; - - // lock (_reentrancyBlocker) - // { - // if (_chainedTask == null) - // { - // _chainedTask = InitialAwaitWhenReady(); - // } - - // resultTask = AwaitChainedTask(_chainedTask); - // _chainedTask = resultTask; - // } - - // return resultTask; - //} - public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None; - //void EvaluateProxyAndCallContinuation(PromisedAnswer.READER rd, Action action) - //{ - // var result = _callTask.Result; - - // DeserializerState cur = result; - - // foreach (var op in rd.Transform) - // { - // switch (op.which) - // { - // case PromisedAnswer.Op.WHICH.GetPointerField: - // try - // { - // cur = cur.StructReadPointer(op.GetPointerField); - // } - // catch (System.Exception) - // { - // throw new ArgumentOutOfRangeException("Illegal pointer field in transformation operation"); - // } - // break; - - // case PromisedAnswer.Op.WHICH.Noop: - // break; - - // default: - // throw new ArgumentOutOfRangeException("Unknown transformation operation"); - // } - // } - - // Proxy proxy; - - // switch (cur.Kind) - // { - // case ObjectKind.Capability: - // try - // { - // var cap = result.MsgBuilder.Caps[(int)cur.CapabilityIndex]; - // proxy = new Proxy(cap ?? LazyCapability.Null); - // } - // catch (ArgumentOutOfRangeException) - // { - // throw new ArgumentOutOfRangeException("Bad capability table in internal answer - internal error?"); - // } - // action(proxy); - // break; - - // default: - // throw new ArgumentOutOfRangeException("Transformation did not result in a capability"); - // } - //} - public async void Dispose() { if (_cts != null)