fixes + coverage

This commit is contained in:
Christian Köllner 2020-04-04 22:57:07 +02:00
parent fef879de9f
commit ac50b7454a
5 changed files with 166 additions and 123 deletions

View File

@ -1,4 +1,5 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Capnp.Rpc;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Linq;
@ -67,5 +68,53 @@ namespace Capnp.Net.Runtime.Tests
Task.WhenAll(tasks).Wait();
}
class PromisedAnswerMock : IPromisedAnswer
{
readonly TaskCompletionSource<DeserializerState> _tcs = new TaskCompletionSource<DeserializerState>();
public Task<DeserializerState> WhenReturned => _tcs.Task;
public void Return()
{
_tcs.SetResult(default);
}
public void Cancel()
{
_tcs.SetCanceled();
}
public void Fault()
{
_tcs.SetException(new InvalidOperationException("test fault"));
}
public ConsumedCapability Access(MemberAccessPath access)
{
throw new NotImplementedException();
}
public ConsumedCapability Access(MemberAccessPath access, Task<IDisposable> proxyTask)
{
throw new NotImplementedException();
}
public void Dispose()
{
}
}
[TestMethod]
public void MakePipelineAwareOnFastPath()
{
var mock = new PromisedAnswerMock();
mock.Return();
for (int i = 0; i < 100; i++)
{
var t = Impatient.MakePipelineAware(mock, _ => (object)null);
Assert.IsTrue(t.IsCompleted);
};
}
}
}

View File

@ -96,6 +96,46 @@ namespace Capnp.Net.Runtime.Tests
}
}
[TestMethod]
public void InterceptServerSideRedirectCall()
{
var policy = new MyPolicy("a");
(var server, var client) = SetupClientServerPair();
using (server)
using (client)
{
client.WhenConnected.Wait();
var counters = new Counters();
server.Main = policy.Attach<ITestInterface>(new TestInterfaceImpl(counters));
var redirTarget = new TestInterfaceImpl2();
using (var main = client.GetMain<ITestInterface>())
using (redirTarget)
{
var request1 = main.Foo(123, true, default);
var fcc = policy.Calls.ReceiveAsync();
Assert.IsTrue(fcc.Wait(MediumNonDbgTimeout));
var cc = fcc.Result;
Assert.ThrowsException<ArgumentNullException>(() => cc.Bob = null);
cc.Bob = redirTarget;
cc.ForwardToBob();
Assert.IsTrue(policy.Returns.ReceiveAsync().Wait(MediumNonDbgTimeout));
cc.ReturnToAlice();
Assert.IsTrue(request1.Wait(MediumNonDbgTimeout));
Assert.AreEqual("bar", request1.Result);
}
Assert.IsTrue(redirTarget.IsDisposed);
Assert.AreEqual(0, counters.CallCount);
}
}
[TestMethod]
public void InterceptClientSideModifyCall()
{
@ -284,6 +324,41 @@ namespace Capnp.Net.Runtime.Tests
}
}
[TestMethod]
public void InterceptClientSideOverrideFaultedCall()
{
var policy = new MyPolicy("a");
(var server, var client) = SetupClientServerPair();
using (server)
using (client)
{
client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
using (var main = policy.Attach(client.GetMain<ITestInterface>()))
{
var request1 = main.Bar();
Assert.IsTrue(policy.Calls.TryReceive(out var cc));
Assert.IsFalse(request1.IsCompleted);
cc.ForwardToBob();
Assert.IsTrue(policy.Returns.ReceiveAsync().Wait(MediumNonDbgTimeout));
Assert.IsNotNull(cc.Exception);
cc.ReturnCanceled = false;
cc.Exception = null;
cc.ReturnToAlice();
Assert.ThrowsException<InvalidOperationException>(() => cc.ReturnToAlice());
Assert.IsTrue(request1.IsCompleted);
Assert.IsFalse(request1.IsFaulted);
}
}
}
[TestMethod]
public void InterceptClientSideRedirectCall()
{

View File

@ -31,40 +31,11 @@ namespace Capnp.Rpc
var rtask = AwaitAnswer();
try
{
// Really weird: We'd expect AwaitAnswer() to initialize a new Task instance upon each invocation.
// However, this does not seem to be always true (as indicated by CI test suite). An explanation might be
// that the underlying implementation recycles Task instances (um, really? doesn't make sense. But the
// observation doesn't make sense, either).
_taskTable.Add(rtask, promise);
}
catch (ArgumentException)
{
if (rtask.IsCompleted)
{
// Force .NET to create a new Task instance
if (rtask.IsCanceled)
{
rtask = Task.FromCanceled<T>(new CancellationToken(true));
}
else if (rtask.IsFaulted)
{
rtask = Task.FromException<T>(rtask.Exception!.InnerException!);
}
else
{
rtask = Task.FromResult<T>(rtask.Result);
}
_taskTable.Add(rtask, promise);
}
else
{
throw new InvalidOperationException("What the heck is wrong with Task?");
}
}
// Rare situation: .NET maintains a cache of some pre-computed tasks for standard results (such as (int)0, (object)null).
// AwaitAnswer() might indeed have chosen a fast-path optimization, such that rtask is a cached object instead of a new instance.
// Once this happens the second time, and we return the same rtask for a different promise. GetAnswer()/TryGetAnswer() may return the "wrong"
// promise! Fortunately, this does not really matter, since the "wrong" promise is guaranteed to return exactly the same answer. :-)
_taskTable.GetValue(rtask, _ => promise);
return rtask;
}

View File

@ -25,50 +25,15 @@ namespace Capnp.Rpc.Interception
public Task<DeserializerState> WhenReturned => _futureResult.Task;
public CancellationToken CancelFromAlice => _cancelFromAlice.Token;
async Task<ConsumedCapability?> AccessWhenReturned(MemberAccessPath access)
{
await WhenReturned;
return Access(access);
}
public ConsumedCapability? Access(MemberAccessPath access)
{
if (_futureResult.Task.IsCompleted)
{
try
{
return access.Eval(WhenReturned.Result);
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
}
else
{
return new LazyCapability(AccessWhenReturned(access));
}
return new LocalAnswerCapability(_futureResult.Task, access);
}
public ConsumedCapability? Access(MemberAccessPath _, Task<IDisposable?> task)
{
var proxyTask = task.AsProxyTask();
if (proxyTask.IsCompleted)
{
try
{
return proxyTask.Result?.ConsumedCap;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
}
else
{
return new LazyCapability(proxyTask);
}
return new LocalAnswerCapability(proxyTask);
}
public void Dispose()
@ -122,10 +87,16 @@ namespace Capnp.Rpc.Interception
/// </summary>
public InterceptionState State { get; private set; }
SerializerState _inArgs;
/// <summary>
/// Input arguments
/// </summary>
public SerializerState? InArgs { get; set; }
public SerializerState InArgs
{
get => _inArgs;
set { _inArgs = value ?? throw new ArgumentNullException(nameof(value)); }
}
/// <summary>
/// Output arguments ("return value")
@ -171,45 +142,41 @@ namespace Capnp.Rpc.Interception
/// <item><description>A <see cref="Proxy"/>-derived object</description></item>
/// <item><description>A <see cref="Skeleton"/>-derived object</description></item>
/// <item><description>A <see cref="ConsumedCapability"/>-derived object (low level capability)</description></item>
/// <item><description>null</description></item>
/// </list>
/// </summary>
public object? Bob
/// <remarks>
/// Note that getting/setting this property does NOT transfer ownership.
/// </remarks>
public object Bob
{
get => _bob;
set
{
if (value != _bob)
{
BobProxy?.Dispose();
BobProxy = null;
_bob = value;
switch (value)
{
case null:
throw new ArgumentNullException(nameof(value));
case Proxy proxy:
BobProxy = proxy;
BobProxy = proxy.Cast<BareProxy>(false);
break;
case ConsumedCapability cap: using (var temp = CapabilityReflection.CreateProxy<object>(cap)) {
Bob = temp; }
break;
case Skeleton skeleton:
BobProxy = CapabilityReflection.CreateProxy<object>(
LocalCapability.Create(skeleton));
break;
case ConsumedCapability cap:
BobProxy = CapabilityReflection.CreateProxy<object>(cap);
break;
case null:
Bob = LocalCapability.Create(skeleton);
break;
default:
BobProxy = CapabilityReflection.CreateProxy<object>(
LocalCapability.Create(
Skeleton.GetOrCreateSkeleton(value, false)));
Bob = Skeleton.GetOrCreateSkeleton(value, false);
break;
}
_bob = value;
}
}
}
@ -218,7 +185,7 @@ namespace Capnp.Rpc.Interception
readonly CensorCapability _censorCapability;
PromisedAnswer _promisedAnswer;
object? _bob;
object _bob;
internal IPromisedAnswer Answer => _promisedAnswer;
@ -226,13 +193,14 @@ namespace Capnp.Rpc.Interception
{
_censorCapability = censorCapability;
_promisedAnswer = new PromisedAnswer(this);
_inArgs = inArgs;
_bob = null!; // Will beinitialized by setting Bob
CancelFromAlice = _promisedAnswer.CancelFromAlice;
CancelToBob = CancelFromAlice;
Bob = censorCapability.InterceptedCapability;
InterfaceId = interfaceId;
MethodId = methodId;
InArgs = inArgs;
State = InterceptionState.RequestedFromAlice;
}
@ -243,12 +211,9 @@ namespace Capnp.Rpc.Interception
for (int i = 0; i < state.Caps.Count; i++)
{
var cap = state.Caps[i];
if (cap != null)
{
cap = policy.Attach(cap);
state.Caps[i] = cap;
cap.AddRef();
}
cap = policy.Attach(cap);
state.Caps[i] = cap;
cap.AddRef();
}
}
}
@ -260,12 +225,9 @@ namespace Capnp.Rpc.Interception
for (int i = 0; i < state.Caps.Count; i++)
{
var cap = state.Caps[i];
if (cap != null)
{
cap = policy.Detach(cap);
state.Caps[i] = cap;
cap.AddRef();
}
cap = policy.Detach(cap);
state.Caps[i] = cap;
cap.AddRef();
}
}
}
@ -274,12 +236,8 @@ namespace Capnp.Rpc.Interception
/// Intercepts all capabilies inside the input arguments
/// </summary>
/// <param name="policyOverride">Policy to use, or null to further use present policy</param>
/// <exception cref="InvalidOperationException">InArgs not set</exception>
public void InterceptInCaps(IInterceptionPolicy? policyOverride = null)
{
if (InArgs == null)
throw new InvalidOperationException("InArgs not set");
InterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy);
}
@ -296,12 +254,8 @@ namespace Capnp.Rpc.Interception
/// Unintercepts all capabilies inside the input arguments
/// </summary>
/// <param name="policyOverride">Policy to remove, or null to remove present policy</param>
/// <exception cref="InvalidOperationException">InArgs not set</exception>
public void UninterceptInCaps(IInterceptionPolicy? policyOverride = null)
{
if (InArgs == null)
throw new InvalidOperationException("InArgs not set");
UninterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy);
}
@ -317,15 +271,8 @@ namespace Capnp.Rpc.Interception
/// <summary>
/// Forwards this intercepted call to the target capability ("Bob").
/// </summary>
/// <exception cref="InvalidOperationException">Bob/InArgs not set</exception>
public void ForwardToBob()
{
if (Bob == null)
throw new InvalidOperationException("Bob is null");
if (InArgs == null)
throw new InvalidOperationException("InArgs not set");
var answer = BobProxy!.Call(InterfaceId, MethodId, InArgs.Rewrap<DynamicSerializerState>(), default, CancelToBob);
State = InterceptionState.ForwardedToBob;

View File

@ -45,7 +45,8 @@ namespace Capnp.Rpc.Interception
switch (cap)
{
case Proxy proxy:
return (CapabilityReflection.CreateProxy<TCap>(Attach(policy, proxy.ConsumedCap!)) as TCap)!;
return (CapabilityReflection.CreateProxy<TCap>(
Attach(policy, proxy.ConsumedCap!)) as TCap)!;
case ConsumedCapability ccap:
return (new CensorCapability(ccap, policy) as TCap)!;