Improved cancellation support during interception

This commit is contained in:
Christian Köllner 2019-11-06 17:58:46 +01:00
parent fd55167d39
commit 51ecd00e82
5 changed files with 191 additions and 94 deletions

View File

@ -7,6 +7,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
@ -217,7 +218,7 @@ namespace Capnp.Net.Runtime.Tests
}
[TestMethod]
public void InterceptClientSideCancelCall()
public void InterceptClientSideCancelReturn()
{
var policy = new MyPolicy("a");
@ -235,8 +236,9 @@ namespace Capnp.Net.Runtime.Tests
var request1 = main.Foo(321, false, default);
Assert.IsTrue(policy.Calls.TryReceive(out var cc));
Assert.IsFalse(request1.IsCompleted);
Assert.IsFalse(cc.CancelFromAlice.IsCancellationRequested);
cc.IsCanceled = true;
cc.ReturnCanceled = true;
cc.ReturnToAlice();
@ -246,6 +248,41 @@ namespace Capnp.Net.Runtime.Tests
}
}
[TestMethod]
public void InterceptClientSideOverrideCanceledCall()
{
var policy = new MyPolicy("a");
(var server, var client) = SetupClientServerPair();
using (server)
using (client)
{
client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
using (var main = policy.Attach(client.GetMain<ITestInterface>()))
{
var request1 = main.Foo(321, false, new CancellationToken(true));
Assert.IsTrue(policy.Calls.TryReceive(out var cc));
Assert.IsFalse(request1.IsCompleted);
Assert.IsTrue(cc.CancelFromAlice.IsCancellationRequested);
cc.ForwardToBob();
Assert.IsTrue(policy.Returns.ReceiveAsync().Wait(MediumNonDbgTimeout));
Assert.IsTrue(cc.ReturnCanceled);
cc.ReturnCanceled = false;
cc.Exception = "Cancelled";
cc.ReturnToAlice();
Assert.IsTrue(request1.IsCompleted);
Assert.IsTrue(request1.IsFaulted);
}
}
}
[TestMethod]
public void InterceptClientSideRedirectCall()
{

View File

@ -16,8 +16,9 @@ namespace Capnp.Rpc.Interception
{
class PromisedAnswer : IPromisedAnswer
{
CallContext _callContext;
TaskCompletionSource<DeserializerState> _futureResult = new TaskCompletionSource<DeserializerState>();
readonly CallContext _callContext;
readonly TaskCompletionSource<DeserializerState> _futureResult = new TaskCompletionSource<DeserializerState>();
readonly CancellationTokenSource _cancelFromAlice = new CancellationTokenSource();
public PromisedAnswer(CallContext callContext)
{
@ -25,6 +26,7 @@ namespace Capnp.Rpc.Interception
}
public Task<DeserializerState> WhenReturned => _futureResult.Task;
public CancellationToken CancelFromAlice => _cancelFromAlice.Token;
async Task<Proxy> AccessWhenReturned(MemberAccessPath access)
{
@ -53,33 +55,107 @@ namespace Capnp.Rpc.Interception
public void Dispose()
{
try
{
_cancelFromAlice.Cancel();
}
catch (ObjectDisposedException)
{
// May happen when cancellation request from Alice arrives after return.
}
}
public void Return()
{
if (_callContext.IsCanceled)
try
{
_futureResult.SetCanceled();
if (_callContext.ReturnCanceled)
{
_futureResult.SetCanceled();
}
else if (_callContext.Exception != null)
{
_futureResult.SetException(new RpcException(_callContext.Exception));
}
else
{
_futureResult.SetResult(_callContext.OutArgs);
}
}
else if (_callContext.Exception != null)
finally
{
_futureResult.SetException(new RpcException(_callContext.Exception));
}
else
{
_futureResult.SetResult(_callContext.OutArgs);
_cancelFromAlice.Dispose();
}
}
}
/// <summary>
/// Target interface ID of this call
/// </summary>
public ulong InterfaceId { get; }
/// <summary>
/// Target method ID of this call
/// </summary>
public ushort MethodId { get; }
public bool IsTailCall { get; }
/// <summary>
/// Lifecycle state of this call
/// </summary>
public InterceptionState State { get; private set; }
/// <summary>
/// Input arguments
/// </summary>
public SerializerState InArgs { get; set; }
/// <summary>
/// Output arguments ("return value")
/// </summary>
public DeserializerState OutArgs { get; set; }
/// <summary>
/// Exception text, or null if there is no exception
/// </summary>
public string Exception { get; set; }
public bool IsCanceled { get; set; }
/// <summary>
/// Whether the call should return in canceled state to Alice (the original caller).
/// In case of forwarding (<see cref="ForwardToBob()"/>) the property is automatically set according
/// to the cancellation state of Bob's answer. However, you may override it:
/// <list type="bullet">
/// <item><description>Setting it from 'false' to 'true' means that we pretend Alice a canceled call.
/// If Alice never requested cancellation this will surprise her pretty much.</description></item>
/// <item><description>Setting it from 'true' to 'false' overrides an existing cancellation. Since
/// we did not receive any output arguments from Bob (due to the cancellation), you *must* provide
/// either <see cref="OutArgs"/> or <see cref="Exception"/>.</description></item>
/// </list>
/// </summary>
public bool ReturnCanceled { get; set; }
/// <summary>
/// The cancellation token *from Alice* tells us when the original caller resigns from the call.
/// </summary>
public CancellationToken CancelFromAlice { get; private set; }
/// <summary>
/// The cancellation token *to Bob* tells the target capability when we resign from the forwarded call.
/// It is initialized with <seealso cref="CancelFromAlice"/>. Override it to achieve different behaviors:
/// E.g. set it to <code>CancellationToken.None</code> for "hiding" any cancellation request from Alice.
/// Set it to <code>new CancellationToken(true)</code> to pretend Bob a cancellation request.
/// </summary>
public CancellationToken CancelToBob { get; set; }
/// <summary>
/// Target capability. May be one of the following:
/// <list type="bullet">
/// <item><description>Capability interface implementation</description></item>
/// <item><description>A <see cref="Proxy"/>-derived object</description></item>
/// <item><description>A <see cref="Skeleton"/>-derived object</description></item>
/// <item><description>A <see cref="ConsumedCapability"/>-derived object (low level capability)</description></item>
/// <item><description>null</description></item>
/// </list>
/// </summary>
public object Bob
{
get => _bob;
@ -132,7 +208,9 @@ namespace Capnp.Rpc.Interception
{
_censorCapability = censorCapability;
_promisedAnswer = new PromisedAnswer(this);
CancelFromAlice = _promisedAnswer.CancelFromAlice;
CancelToBob = CancelFromAlice;
Bob = censorCapability.InterceptedCapability;
InterfaceId = interfaceId;
MethodId = methodId;
@ -164,34 +242,54 @@ namespace Capnp.Rpc.Interception
}
}
/// <summary>
/// Intercepts all capabilies inside the input arguments
/// </summary>
/// <param name="policyOverride">Policy to use, or null to further use present policy</param>
public void InterceptInCaps(IInterceptionPolicy policyOverride = null)
{
InterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy);
}
/// <summary>
/// Intercepts all capabilies inside the output arguments
/// </summary>
/// <param name="policyOverride">Policy to use, or null to further use present policy</param>
public void InterceptOutCaps(IInterceptionPolicy policyOverride = null)
{
InterceptCaps(OutArgs, policyOverride ?? _censorCapability.Policy);
}
/// <summary>
/// Unintercepts all capabilies inside the input arguments
/// </summary>
/// <param name="policyOverride">Policy to remove, or null to remove present policy</param>
public void UninterceptInCaps(IInterceptionPolicy policyOverride = null)
{
UninterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy);
}
/// <summary>
/// Unintercepts all capabilies inside the output arguments
/// </summary>
/// <param name="policyOverride">Policy to remove, or null to remove present policy</param>
public void UninterceptOutCaps(IInterceptionPolicy policyOverride = null)
{
UninterceptCaps(OutArgs, policyOverride ?? _censorCapability.Policy);
}
public void ForwardToBob(CancellationToken cancellationToken = default)
/// <summary>
/// Forwards this intercepted call to the target capability ("Bob").
/// </summary>
/// <param name="cancellationToken">Optional cancellation token, requesting Bob to cancel the call</param>
public void ForwardToBob()
{
if (Bob == null)
{
throw new InvalidOperationException("Bob is null");
}
var answer = BobProxy.Call(InterfaceId, MethodId, InArgs.Rewrap<DynamicSerializerState>(), IsTailCall, cancellationToken);
var answer = BobProxy.Call(InterfaceId, MethodId, InArgs.Rewrap<DynamicSerializerState>(), false, CancelToBob);
State = InterceptionState.ForwardedToBob;
@ -205,7 +303,7 @@ namespace Capnp.Rpc.Interception
}
catch (TaskCanceledException)
{
IsCanceled = true;
ReturnCanceled = true;
}
catch (System.Exception exception)
{
@ -221,6 +319,9 @@ namespace Capnp.Rpc.Interception
ChangeStateWhenReturned();
}
/// <summary>
/// Returns this intercepted call to the caller ("Alice").
/// </summary>
public void ReturnToAlice()
{
try

View File

@ -2,9 +2,22 @@
namespace Capnp.Rpc.Interception
{
/// <summary>
/// An interception policy implements callbacks for outgoing calls and returning forwarded calls.
/// </summary>
public interface IInterceptionPolicy: IEquatable<IInterceptionPolicy>
{
/// <summary>
/// A caller ("Alice") initiated a new call, which is now intercepted.
/// </summary>
/// <param name="callContext">Context object</param>
void OnCallFromAlice(CallContext callContext);
/// <summary>
/// Given that the intercepted call was forwarded, it returned now from the target ("Bob")
/// and may (or may not) be returned to the original caller ("Alice").
/// </summary>
/// <param name="callContext"></param>
void OnReturnFromBob(CallContext callContext);
}
}

View File

@ -4,11 +4,23 @@ using System.Runtime.CompilerServices;
namespace Capnp.Rpc.Interception
{
/// <summary>
/// This static class provides extension methods for intercepting and unintercepting capabilities.
/// </summary>
public static class Interceptor
{
static readonly ConditionalWeakTable<ConsumedCapability, CensorCapability> _interceptMap =
new ConditionalWeakTable<ConsumedCapability, CensorCapability>();
/// <summary>
/// Attach this policy to given capability.
/// </summary>
/// <typeparam name="TCap">Capability interface type</typeparam>
/// <param name="policy">Policy to attach</param>
/// <param name="cap">Capability to censor</param>
/// <returns>Censored capability instance</returns>
/// <exception cref="ArgumentNullException"><paramref name="policy"/> is null or
/// <paramref name="cap"/> is null</exception>
public static TCap Attach<TCap>(this IInterceptionPolicy policy, TCap cap)
where TCap: class
{
@ -46,6 +58,15 @@ namespace Capnp.Rpc.Interception
}
}
/// <summary>
/// Detach this policy from given (censored) capability.
/// </summary>
/// <typeparam name="TCap">Capability interface type</typeparam>
/// <param name="policy">Policy to detach</param>
/// <param name="cap">Capability to clean</param>
/// <returns>Clean capability instance (at least, without this interception policy)</returns>
/// <exception cref="ArgumentNullException"><paramref name="policy"/> is null or
/// <paramref name="cap"/> is null</exception>
public static TCap Detach<TCap>(this IInterceptionPolicy policy, TCap cap)
where TCap: class
{

View File

@ -163,83 +163,8 @@ namespace Capnp.Rpc
});
}
//Task<SerializerState> ChainedAwaitWhenReady()
//{
// async Task<SerializerState> AwaitChainedTask(Task chainedTask)
// {
// await chainedTask;
// return _callTask.Result;
// }
// Task<SerializerState> resultTask;
// lock (_reentrancyBlocker)
// {
// if (_chainedTask == null)
// {
// _chainedTask = InitialAwaitWhenReady();
// }
// resultTask = AwaitChainedTask(_chainedTask);
// _chainedTask = resultTask;
// }
// return resultTask;
//}
public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None;
//void EvaluateProxyAndCallContinuation(PromisedAnswer.READER rd, Action<Proxy> action)
//{
// var result = _callTask.Result;
// DeserializerState cur = result;
// foreach (var op in rd.Transform)
// {
// switch (op.which)
// {
// case PromisedAnswer.Op.WHICH.GetPointerField:
// try
// {
// cur = cur.StructReadPointer(op.GetPointerField);
// }
// catch (System.Exception)
// {
// throw new ArgumentOutOfRangeException("Illegal pointer field in transformation operation");
// }
// break;
// case PromisedAnswer.Op.WHICH.Noop:
// break;
// default:
// throw new ArgumentOutOfRangeException("Unknown transformation operation");
// }
// }
// Proxy proxy;
// switch (cur.Kind)
// {
// case ObjectKind.Capability:
// try
// {
// var cap = result.MsgBuilder.Caps[(int)cur.CapabilityIndex];
// proxy = new Proxy(cap ?? LazyCapability.Null);
// }
// catch (ArgumentOutOfRangeException)
// {
// throw new ArgumentOutOfRangeException("Bad capability table in internal answer - internal error?");
// }
// action(proxy);
// break;
// default:
// throw new ArgumentOutOfRangeException("Transformation did not result in a capability");
// }
//}
public async void Dispose()
{
if (_cts != null)