From ac50b7454ad330a62b6b7c8b901a63fd8bbdb550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 4 Apr 2020 22:57:07 +0200 Subject: [PATCH] fixes + coverage --- Capnp.Net.Runtime.Tests/General.cs | 51 +++++++- Capnp.Net.Runtime.Tests/Interception.cs | 75 +++++++++++ Capnp.Net.Runtime/Rpc/Impatient.cs | 39 +----- .../Rpc/Interception/CallContext.cs | 121 +++++------------- .../Rpc/Interception/Interceptor.cs | 3 +- 5 files changed, 166 insertions(+), 123 deletions(-) diff --git a/Capnp.Net.Runtime.Tests/General.cs b/Capnp.Net.Runtime.Tests/General.cs index 662b039..c8067ef 100644 --- a/Capnp.Net.Runtime.Tests/General.cs +++ b/Capnp.Net.Runtime.Tests/General.cs @@ -1,4 +1,5 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Capnp.Rpc; +using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Collections.Generic; using System.Linq; @@ -67,5 +68,53 @@ namespace Capnp.Net.Runtime.Tests Task.WhenAll(tasks).Wait(); } + + class PromisedAnswerMock : IPromisedAnswer + { + readonly TaskCompletionSource _tcs = new TaskCompletionSource(); + + public Task WhenReturned => _tcs.Task; + + public void Return() + { + _tcs.SetResult(default); + } + + public void Cancel() + { + _tcs.SetCanceled(); + } + + public void Fault() + { + _tcs.SetException(new InvalidOperationException("test fault")); + } + + public ConsumedCapability Access(MemberAccessPath access) + { + throw new NotImplementedException(); + } + + public ConsumedCapability Access(MemberAccessPath access, Task proxyTask) + { + throw new NotImplementedException(); + } + + public void Dispose() + { + } + } + + [TestMethod] + public void MakePipelineAwareOnFastPath() + { + var mock = new PromisedAnswerMock(); + mock.Return(); + for (int i = 0; i < 100; i++) + { + var t = Impatient.MakePipelineAware(mock, _ => (object)null); + Assert.IsTrue(t.IsCompleted); + }; + } } } diff --git a/Capnp.Net.Runtime.Tests/Interception.cs b/Capnp.Net.Runtime.Tests/Interception.cs index 3008723..f2f7b71 100644 --- a/Capnp.Net.Runtime.Tests/Interception.cs +++ b/Capnp.Net.Runtime.Tests/Interception.cs @@ -96,6 +96,46 @@ namespace Capnp.Net.Runtime.Tests } } + [TestMethod] + public void InterceptServerSideRedirectCall() + { + var policy = new MyPolicy("a"); + + (var server, var client) = SetupClientServerPair(); + + using (server) + using (client) + { + client.WhenConnected.Wait(); + + var counters = new Counters(); + server.Main = policy.Attach(new TestInterfaceImpl(counters)); + var redirTarget = new TestInterfaceImpl2(); + using (var main = client.GetMain()) + using (redirTarget) + { + var request1 = main.Foo(123, true, default); + var fcc = policy.Calls.ReceiveAsync(); + Assert.IsTrue(fcc.Wait(MediumNonDbgTimeout)); + var cc = fcc.Result; + + Assert.ThrowsException(() => cc.Bob = null); + cc.Bob = redirTarget; + cc.ForwardToBob(); + + Assert.IsTrue(policy.Returns.ReceiveAsync().Wait(MediumNonDbgTimeout)); + + cc.ReturnToAlice(); + + Assert.IsTrue(request1.Wait(MediumNonDbgTimeout)); + + Assert.AreEqual("bar", request1.Result); + } + Assert.IsTrue(redirTarget.IsDisposed); + Assert.AreEqual(0, counters.CallCount); + } + } + [TestMethod] public void InterceptClientSideModifyCall() { @@ -284,6 +324,41 @@ namespace Capnp.Net.Runtime.Tests } } + [TestMethod] + public void InterceptClientSideOverrideFaultedCall() + { + var policy = new MyPolicy("a"); + + (var server, var client) = SetupClientServerPair(); + + using (server) + using (client) + { + client.WhenConnected.Wait(); + + var counters = new Counters(); + server.Main = new TestInterfaceImpl(counters); + using (var main = policy.Attach(client.GetMain())) + { + var request1 = main.Bar(); + Assert.IsTrue(policy.Calls.TryReceive(out var cc)); + Assert.IsFalse(request1.IsCompleted); + + cc.ForwardToBob(); + Assert.IsTrue(policy.Returns.ReceiveAsync().Wait(MediumNonDbgTimeout)); + Assert.IsNotNull(cc.Exception); + cc.ReturnCanceled = false; + cc.Exception = null; + + cc.ReturnToAlice(); + Assert.ThrowsException(() => cc.ReturnToAlice()); + + Assert.IsTrue(request1.IsCompleted); + Assert.IsFalse(request1.IsFaulted); + } + } + } + [TestMethod] public void InterceptClientSideRedirectCall() { diff --git a/Capnp.Net.Runtime/Rpc/Impatient.cs b/Capnp.Net.Runtime/Rpc/Impatient.cs index bddc2be..4bdbc8a 100644 --- a/Capnp.Net.Runtime/Rpc/Impatient.cs +++ b/Capnp.Net.Runtime/Rpc/Impatient.cs @@ -31,40 +31,11 @@ namespace Capnp.Rpc var rtask = AwaitAnswer(); - try - { - // Really weird: We'd expect AwaitAnswer() to initialize a new Task instance upon each invocation. - // However, this does not seem to be always true (as indicated by CI test suite). An explanation might be - // that the underlying implementation recycles Task instances (um, really? doesn't make sense. But the - // observation doesn't make sense, either). - - _taskTable.Add(rtask, promise); - } - catch (ArgumentException) - { - if (rtask.IsCompleted) - { - // Force .NET to create a new Task instance - if (rtask.IsCanceled) - { - rtask = Task.FromCanceled(new CancellationToken(true)); - } - else if (rtask.IsFaulted) - { - rtask = Task.FromException(rtask.Exception!.InnerException!); - } - else - { - rtask = Task.FromResult(rtask.Result); - } - - _taskTable.Add(rtask, promise); - } - else - { - throw new InvalidOperationException("What the heck is wrong with Task?"); - } - } + // Rare situation: .NET maintains a cache of some pre-computed tasks for standard results (such as (int)0, (object)null). + // AwaitAnswer() might indeed have chosen a fast-path optimization, such that rtask is a cached object instead of a new instance. + // Once this happens the second time, and we return the same rtask for a different promise. GetAnswer()/TryGetAnswer() may return the "wrong" + // promise! Fortunately, this does not really matter, since the "wrong" promise is guaranteed to return exactly the same answer. :-) + _taskTable.GetValue(rtask, _ => promise); return rtask; } diff --git a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs index 018d865..86c4db9 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs @@ -25,50 +25,15 @@ namespace Capnp.Rpc.Interception public Task WhenReturned => _futureResult.Task; public CancellationToken CancelFromAlice => _cancelFromAlice.Token; - async Task AccessWhenReturned(MemberAccessPath access) - { - await WhenReturned; - return Access(access); - } - public ConsumedCapability? Access(MemberAccessPath access) { - if (_futureResult.Task.IsCompleted) - { - try - { - return access.Eval(WhenReturned.Result); - } - catch (AggregateException exception) - { - throw exception.InnerException!; - } - } - else - { - return new LazyCapability(AccessWhenReturned(access)); - } + return new LocalAnswerCapability(_futureResult.Task, access); } public ConsumedCapability? Access(MemberAccessPath _, Task task) { var proxyTask = task.AsProxyTask(); - - if (proxyTask.IsCompleted) - { - try - { - return proxyTask.Result?.ConsumedCap; - } - catch (AggregateException exception) - { - throw exception.InnerException!; - } - } - else - { - return new LazyCapability(proxyTask); - } + return new LocalAnswerCapability(proxyTask); } public void Dispose() @@ -122,10 +87,16 @@ namespace Capnp.Rpc.Interception /// public InterceptionState State { get; private set; } + SerializerState _inArgs; + /// /// Input arguments /// - public SerializerState? InArgs { get; set; } + public SerializerState InArgs + { + get => _inArgs; + set { _inArgs = value ?? throw new ArgumentNullException(nameof(value)); } + } /// /// Output arguments ("return value") @@ -171,45 +142,41 @@ namespace Capnp.Rpc.Interception /// A -derived object /// A -derived object /// A -derived object (low level capability) - /// null /// /// - public object? Bob + /// + /// Note that getting/setting this property does NOT transfer ownership. + /// + public object Bob { get => _bob; set { if (value != _bob) { - BobProxy?.Dispose(); - BobProxy = null; - - _bob = value; - switch (value) { + case null: + throw new ArgumentNullException(nameof(value)); + case Proxy proxy: - BobProxy = proxy; + BobProxy = proxy.Cast(false); + break; + + case ConsumedCapability cap: using (var temp = CapabilityReflection.CreateProxy(cap)) { + Bob = temp; } break; case Skeleton skeleton: - BobProxy = CapabilityReflection.CreateProxy( - LocalCapability.Create(skeleton)); - break; - - case ConsumedCapability cap: - BobProxy = CapabilityReflection.CreateProxy(cap); - break; - - case null: + Bob = LocalCapability.Create(skeleton); break; default: - BobProxy = CapabilityReflection.CreateProxy( - LocalCapability.Create( - Skeleton.GetOrCreateSkeleton(value, false))); + Bob = Skeleton.GetOrCreateSkeleton(value, false); break; } + + _bob = value; } } } @@ -218,7 +185,7 @@ namespace Capnp.Rpc.Interception readonly CensorCapability _censorCapability; PromisedAnswer _promisedAnswer; - object? _bob; + object _bob; internal IPromisedAnswer Answer => _promisedAnswer; @@ -226,13 +193,14 @@ namespace Capnp.Rpc.Interception { _censorCapability = censorCapability; _promisedAnswer = new PromisedAnswer(this); + _inArgs = inArgs; + _bob = null!; // Will beinitialized by setting Bob CancelFromAlice = _promisedAnswer.CancelFromAlice; CancelToBob = CancelFromAlice; Bob = censorCapability.InterceptedCapability; InterfaceId = interfaceId; MethodId = methodId; - InArgs = inArgs; State = InterceptionState.RequestedFromAlice; } @@ -243,12 +211,9 @@ namespace Capnp.Rpc.Interception for (int i = 0; i < state.Caps.Count; i++) { var cap = state.Caps[i]; - if (cap != null) - { - cap = policy.Attach(cap); - state.Caps[i] = cap; - cap.AddRef(); - } + cap = policy.Attach(cap); + state.Caps[i] = cap; + cap.AddRef(); } } } @@ -260,12 +225,9 @@ namespace Capnp.Rpc.Interception for (int i = 0; i < state.Caps.Count; i++) { var cap = state.Caps[i]; - if (cap != null) - { - cap = policy.Detach(cap); - state.Caps[i] = cap; - cap.AddRef(); - } + cap = policy.Detach(cap); + state.Caps[i] = cap; + cap.AddRef(); } } } @@ -274,12 +236,8 @@ namespace Capnp.Rpc.Interception /// Intercepts all capabilies inside the input arguments /// /// Policy to use, or null to further use present policy - /// InArgs not set public void InterceptInCaps(IInterceptionPolicy? policyOverride = null) { - if (InArgs == null) - throw new InvalidOperationException("InArgs not set"); - InterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy); } @@ -296,12 +254,8 @@ namespace Capnp.Rpc.Interception /// Unintercepts all capabilies inside the input arguments /// /// Policy to remove, or null to remove present policy - /// InArgs not set public void UninterceptInCaps(IInterceptionPolicy? policyOverride = null) { - if (InArgs == null) - throw new InvalidOperationException("InArgs not set"); - UninterceptCaps(InArgs, policyOverride ?? _censorCapability.Policy); } @@ -317,15 +271,8 @@ namespace Capnp.Rpc.Interception /// /// Forwards this intercepted call to the target capability ("Bob"). /// - /// Bob/InArgs not set public void ForwardToBob() { - if (Bob == null) - throw new InvalidOperationException("Bob is null"); - - if (InArgs == null) - throw new InvalidOperationException("InArgs not set"); - var answer = BobProxy!.Call(InterfaceId, MethodId, InArgs.Rewrap(), default, CancelToBob); State = InterceptionState.ForwardedToBob; diff --git a/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs b/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs index 1413f6c..aeee3f5 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs @@ -45,7 +45,8 @@ namespace Capnp.Rpc.Interception switch (cap) { case Proxy proxy: - return (CapabilityReflection.CreateProxy(Attach(policy, proxy.ConsumedCap!)) as TCap)!; + return (CapabilityReflection.CreateProxy( + Attach(policy, proxy.ConsumedCap!)) as TCap)!; case ConsumedCapability ccap: return (new CensorCapability(ccap, policy) as TCap)!;