diff --git a/Capnp.Net.Runtime.Tests/ProvidedCapabilityMock.cs b/Capnp.Net.Runtime.Tests/ProvidedCapabilityMock.cs index 2e987f6..7cd187e 100644 --- a/Capnp.Net.Runtime.Tests/ProvidedCapabilityMock.cs +++ b/Capnp.Net.Runtime.Tests/ProvidedCapabilityMock.cs @@ -4,7 +4,7 @@ using Capnp.Rpc; namespace Capnp.Net.Runtime.Tests { - class ProvidedCapabilityMock : Skeleton + class ProvidedCapabilityMock : RefCountingSkeleton { readonly TaskCompletionSource<(ulong, ushort, DeserializerState, CancellationToken)> _call = new TaskCompletionSource<(ulong, ushort, DeserializerState, CancellationToken)>(); diff --git a/Capnp.Net.Runtime.Tests/ProvidedCapabilityMultiCallMock.cs b/Capnp.Net.Runtime.Tests/ProvidedCapabilityMultiCallMock.cs index 58bb57a..771c786 100644 --- a/Capnp.Net.Runtime.Tests/ProvidedCapabilityMultiCallMock.cs +++ b/Capnp.Net.Runtime.Tests/ProvidedCapabilityMultiCallMock.cs @@ -6,7 +6,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; namespace Capnp.Net.Runtime.Tests { - class ProvidedCapabilityMultiCallMock : Skeleton + class ProvidedCapabilityMultiCallMock : RefCountingSkeleton { readonly BufferBlock _ccs = new BufferBlock(); diff --git a/Capnp.Net.Runtime.Tests/TcpRpc.cs b/Capnp.Net.Runtime.Tests/TcpRpc.cs index 7836f96..009c94d 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpc.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpc.cs @@ -389,7 +389,7 @@ namespace Capnp.Net.Runtime.Tests var result = DynamicSerializerState.CreateForRpc(); result.SetStruct(1, 2); result.WriteData(0, 654321); - uint id = result.ProvideCapability(mock2); + uint id = result.ProvideCapability(mock2).Value; result.LinkToCapability(1, id); mock.Return.SetResult(result); @@ -453,7 +453,7 @@ namespace Capnp.Net.Runtime.Tests var result = DynamicSerializerState.CreateForRpc(); result.SetStruct(1, 2); result.WriteData(0, 654321); - uint id = result.ProvideCapability(mock2); + uint id = result.ProvideCapability(mock2).Value; result.LinkToCapability(1, id); mock.Return.SetResult(result); @@ -543,7 +543,7 @@ namespace Capnp.Net.Runtime.Tests var result = DynamicSerializerState.CreateForRpc(); result.SetStruct(1, 2); result.WriteData(0, 654321); - uint id = result.ProvideCapability(mock2); + uint id = result.ProvideCapability(mock2).Value; result.LinkToCapability(1, id); mock.Return.SetResult(result); @@ -707,7 +707,7 @@ namespace Capnp.Net.Runtime.Tests var result = DynamicSerializerState.CreateForRpc(); result.SetStruct(1, 2); result.WriteData(0, 654321); - uint id = result.ProvideCapability(mock2); + uint id = result.ProvideCapability(mock2).Value; result.LinkToCapability(1, id); mock.Return.SetResult(result); diff --git a/Capnp.Net.Runtime.Tests/Testsuite.cs b/Capnp.Net.Runtime.Tests/Testsuite.cs index c64c0be..a888d65 100644 --- a/Capnp.Net.Runtime.Tests/Testsuite.cs +++ b/Capnp.Net.Runtime.Tests/Testsuite.cs @@ -677,7 +677,7 @@ namespace Capnp.Net.Runtime.Tests } } - class ThrowingSkeleton : Skeleton + class ThrowingSkeleton : RefCountingSkeleton { public bool WasCalled { get; private set; } diff --git a/Capnp.Net.Runtime/DeserializerState.cs b/Capnp.Net.Runtime/DeserializerState.cs index 5773466..c5e7b71 100644 --- a/Capnp.Net.Runtime/DeserializerState.cs +++ b/Capnp.Net.Runtime/DeserializerState.cs @@ -386,7 +386,7 @@ namespace Capnp /// offset negative or out of range /// capability table not set /// not a capability pointer or invalid capability index - internal Rpc.ConsumedCapability? DecodeCapPointer(int offset) + internal Rpc.ConsumedCapability DecodeCapPointer(int offset) { if (offset < 0) { @@ -404,7 +404,7 @@ namespace Capnp { // Despite this behavior is not officially specified, // the official C++ implementation seems to send null pointers for null caps. - return null; + return Rpc.NullCapability.Instance; } if (pointer.Kind != PointerKind.Other) @@ -496,13 +496,13 @@ namespace Capnp return state; } - internal Rpc.ConsumedCapability? StructReadRawCap(int index) + internal Rpc.ConsumedCapability StructReadRawCap(int index) { if (Kind != ObjectKind.Struct && Kind != ObjectKind.Nil) throw new InvalidOperationException("Allowed on structs only"); if (index >= StructPtrCount) - return null; + return Rpc.NullCapability.Instance; return DecodeCapPointer(index + StructDataCount); } diff --git a/Capnp.Net.Runtime/Rpc/BareProxy.cs b/Capnp.Net.Runtime/Rpc/BareProxy.cs index 5f9dca5..ff68ab0 100644 --- a/Capnp.Net.Runtime/Rpc/BareProxy.cs +++ b/Capnp.Net.Runtime/Rpc/BareProxy.cs @@ -19,7 +19,7 @@ /// Problem with building the Skeleton type, or problem with loading some dependent class. public static BareProxy FromImpl(object impl) { - return new BareProxy(LocalCapability.Create(CapabilityReflection.CreateSkeleton(impl))); + return new BareProxy(CapabilityReflection.CreateSkeleton(impl).AsCapability()); } /// @@ -33,7 +33,7 @@ /// Constructs an instance and binds it to the given low-level capability. /// /// low-level capability - public BareProxy(ConsumedCapability? cap): base(cap) + public BareProxy(ConsumedCapability cap): base(cap) { } diff --git a/Capnp.Net.Runtime/Rpc/CapabilityReflection.cs b/Capnp.Net.Runtime/Rpc/CapabilityReflection.cs index ce0205a..97b8b63 100644 --- a/Capnp.Net.Runtime/Rpc/CapabilityReflection.cs +++ b/Capnp.Net.Runtime/Rpc/CapabilityReflection.cs @@ -259,7 +259,7 @@ namespace Capnp.Rpc /// Problem with instatiating the Proxy (constructor threw exception). /// Caller does not have permission to invoke the Proxy constructor. /// Problem with building the Proxy type, or problem with loading some dependent class. - public static Proxy CreateProxy(ConsumedCapability? cap) + public static Proxy CreateProxy(ConsumedCapability cap) { var factory = GetProxyFactory(typeof(TInterface)); var proxy = factory.NewProxy(); diff --git a/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs b/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs index 594f375..ea8c5b3 100644 --- a/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs @@ -18,6 +18,7 @@ namespace Capnp.Rpc internal abstract Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer); internal abstract void AddRef(); internal abstract void Release(); + internal abstract Skeleton AsSkeleton(); #if DebugFinalizers internal Proxy? OwningProxy { get; set; } diff --git a/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs b/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs index a641a6b..6c0de2f 100644 --- a/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs @@ -22,7 +22,7 @@ namespace Capnp.Rpc /// /// Path to the desired capability inside the result struct. /// Pipelined low-level capability - ConsumedCapability? Access(MemberAccessPath access); + ConsumedCapability Access(MemberAccessPath access); /// /// @@ -30,7 +30,7 @@ namespace Capnp.Rpc /// Creates a low-level capability for promise pipelining. /// Task returning the proxy whose ownership will be taken over /// - ConsumedCapability? Access(MemberAccessPath access, Task proxyTask); + ConsumedCapability Access(MemberAccessPath access, Task proxyTask); /// /// Whether the question was asked as tail call diff --git a/Capnp.Net.Runtime/Rpc/Impatient.cs b/Capnp.Net.Runtime/Rpc/Impatient.cs index 0f4685c..a7be980 100644 --- a/Capnp.Net.Runtime/Rpc/Impatient.cs +++ b/Capnp.Net.Runtime/Rpc/Impatient.cs @@ -76,7 +76,7 @@ namespace Capnp.Rpc /// path to the desired capability /// task returning a proxy to the desired capability /// Pipelined low-level capability - public static ConsumedCapability? Access(Task task, MemberAccessPath access, Task proxyTask) + public static ConsumedCapability Access(Task task, MemberAccessPath access, Task proxyTask) { var answer = TryGetAnswer(task); if (answer != null) return answer.Access(access, proxyTask); @@ -164,7 +164,7 @@ namespace Capnp.Rpc return cap; var unwrapped = await proxy.ConsumedCap.Unwrap(); - if (unwrapped == null) + if (unwrapped == null || unwrapped == NullCapability.Instance) return null; return ((CapabilityReflection.CreateProxy(unwrapped)) as TInterface)!; diff --git a/Capnp.Net.Runtime/Rpc/ImportedCapability.cs b/Capnp.Net.Runtime/Rpc/ImportedCapability.cs index 0cb2be4..1635e8b 100644 --- a/Capnp.Net.Runtime/Rpc/ImportedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/ImportedCapability.cs @@ -38,7 +38,7 @@ namespace Capnp.Rpc else { capDesc.which = CapDescriptor.WHICH.SenderHosted; - capDesc.SenderHosted = endpoint.AllocateExport(Vine.Create(this), out var _); + capDesc.SenderHosted = endpoint.AllocateExport(AsSkeleton(), out var _); } return null; } diff --git a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs index 0843119..9169022 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs @@ -25,12 +25,12 @@ namespace Capnp.Rpc.Interception public Task WhenReturned => _futureResult.Task; public CancellationToken CancelFromAlice => _cancelFromAlice.Token; - public ConsumedCapability? Access(MemberAccessPath access) + public ConsumedCapability Access(MemberAccessPath access) { return new LocalAnswerCapability(_futureResult.Task, access); } - public ConsumedCapability? Access(MemberAccessPath _, Task task) + public ConsumedCapability Access(MemberAccessPath _, Task task) { var proxyTask = task.AsProxyTask(); return new LocalAnswerCapability(proxyTask); @@ -165,12 +165,18 @@ namespace Capnp.Rpc.Interception BobProxy = proxy.Cast(false); break; - case ConsumedCapability cap: using (var temp = CapabilityReflection.CreateProxy(cap)) { - Bob = temp; } + case ConsumedCapability cap: + using (var temp = CapabilityReflection.CreateProxy(cap)) + { + Bob = temp; + } break; case Skeleton skeleton: - Bob = LocalCapability.Create(skeleton); + using (var nullProxy = new Proxy()) + { + Bob = (object?)skeleton.AsCapability() ?? nullProxy; + } break; default: @@ -213,9 +219,12 @@ namespace Capnp.Rpc.Interception for (int i = 0; i < state.Caps.Count; i++) { var cap = state.Caps[i]; - cap = policy.Attach(cap); - state.Caps[i] = cap; - cap.AddRef(); + if (cap != null) + { + cap = policy.Attach(cap); + state.Caps[i] = cap; + cap.AddRef(); + } } } } @@ -227,9 +236,12 @@ namespace Capnp.Rpc.Interception for (int i = 0; i < state.Caps.Count; i++) { var cap = state.Caps[i]; - cap = policy.Detach(cap); - state.Caps[i] = cap; - cap.AddRef(); + if (cap != null) + { + cap = policy.Detach(cap); + state.Caps[i] = cap; + cap.AddRef(); + } } } } diff --git a/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs b/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs index 54f4754..cef2dc2 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs @@ -9,12 +9,10 @@ namespace Capnp.Rpc.Interception InterceptedCapability = interceptedCapability; interceptedCapability.AddRef(); Policy = policy; - MyVine = Vine.Create(this); } public ConsumedCapability InterceptedCapability { get; } public IInterceptionPolicy Policy { get; } - internal Skeleton MyVine { get; } protected override void ReleaseRemotely() { @@ -31,7 +29,7 @@ namespace Capnp.Rpc.Interception internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) { writer.which = CapDescriptor.WHICH.SenderHosted; - writer.SenderHosted = endpoint.AllocateExport(MyVine, out bool _); + writer.SenderHosted = endpoint.AllocateExport(AsSkeleton(), out bool _); return null; } } diff --git a/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs b/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs index aeee3f5..d2f241d 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/Interceptor.cs @@ -52,10 +52,9 @@ namespace Capnp.Rpc.Interception return (new CensorCapability(ccap, policy) as TCap)!; default: - return (Attach(policy, - (CapabilityReflection.CreateProxy( - LocalCapability.Create( - Skeleton.GetOrCreateSkeleton(cap, false))) as TCap)!)); + var temp = (CapabilityReflection.CreateProxy( + Skeleton.GetOrCreateSkeleton(cap, false).AsCapability())) as TCap; + return Attach(policy, temp!)!; } } diff --git a/Capnp.Net.Runtime/Rpc/LazyCapability.cs b/Capnp.Net.Runtime/Rpc/LazyCapability.cs index 90f91a8..9794e3f 100644 --- a/Capnp.Net.Runtime/Rpc/LazyCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LazyCapability.cs @@ -4,22 +4,23 @@ using System.Threading.Tasks; namespace Capnp.Rpc { + class LazyCapability : RefCountingCapability, IResolvingCapability { public static LazyCapability CreateBrokenCap(string message) { - return new LazyCapability(Task.FromException(new RpcException(message))); + return new LazyCapability(Task.FromException(new RpcException(message))); } public static LazyCapability CreateCanceledCap(CancellationToken token) { - return new LazyCapability(Task.FromCanceled(token)); + return new LazyCapability(Task.FromCanceled(token)); } readonly Task? _proxyTask; - readonly Task _capTask; + readonly Task _capTask; - public LazyCapability(Task capabilityTask) + public LazyCapability(Task capabilityTask) { _capTask = capabilityTask; } @@ -28,7 +29,7 @@ namespace Capnp.Rpc { _proxyTask = proxyTask; - async Task AwaitCap() => (await _proxyTask!).ConsumedCap; + async Task AwaitCap() => (await _proxyTask!).ConsumedCap; _capTask = AwaitCap(); } @@ -68,7 +69,7 @@ namespace Capnp.Rpc { try { - return CapabilityReflection.CreateProxy(_capTask.Result) as T; + return (CapabilityReflection.CreateProxy(_capTask.Result) as T)!; } catch (AggregateException exception) { @@ -83,7 +84,7 @@ namespace Capnp.Rpc async Task CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken) { - ConsumedCapability? cap; + ConsumedCapability cap; try { cap = await _capTask; diff --git a/Capnp.Net.Runtime/Rpc/LocalCapability.cs b/Capnp.Net.Runtime/Rpc/LocalCapability.cs index 140fa50..4024f32 100644 --- a/Capnp.Net.Runtime/Rpc/LocalCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LocalCapability.cs @@ -7,14 +7,6 @@ namespace Capnp.Rpc { class LocalCapability : ConsumedCapability { - public static ConsumedCapability Create(Skeleton skeleton) - { - if (skeleton is Vine vine) - return vine.Proxy.ConsumedCap!; - else - return new LocalCapability(skeleton); - } - static async Task AwaitAnswer(Task call) { var aorcq = await call; @@ -23,7 +15,9 @@ namespace Capnp.Rpc public Skeleton ProvidedCap { get; } - LocalCapability(Skeleton providedCap) + internal override Skeleton AsSkeleton() => ProvidedCap; + + public LocalCapability(Skeleton providedCap) { ProvidedCap = providedCap ?? throw new ArgumentNullException(nameof(providedCap)); } diff --git a/Capnp.Net.Runtime/Rpc/MemberAccessPath.cs b/Capnp.Net.Runtime/Rpc/MemberAccessPath.cs index 3b4cf20..1eaed67 100644 --- a/Capnp.Net.Runtime/Rpc/MemberAccessPath.cs +++ b/Capnp.Net.Runtime/Rpc/MemberAccessPath.cs @@ -169,7 +169,7 @@ namespace Capnp.Rpc /// The object (usually "params struct") on which to evaluate this path. /// Resulting low-level capability /// Evaluation of this path did not give a capability - public ConsumedCapability? Eval(DeserializerState rpcState) + public ConsumedCapability Eval(DeserializerState rpcState) { var cur = rpcState; @@ -181,7 +181,7 @@ namespace Capnp.Rpc switch (cur.Kind) { case ObjectKind.Nil: - return null; + return NullCapability.Instance; case ObjectKind.Capability: return rpcState.Caps![(int)cur.CapabilityIndex]; diff --git a/Capnp.Net.Runtime/Rpc/NullCapability.cs b/Capnp.Net.Runtime/Rpc/NullCapability.cs new file mode 100644 index 0000000..55e20d0 --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/NullCapability.cs @@ -0,0 +1,54 @@ +using System; + +namespace Capnp.Rpc +{ + /// + /// Null capability + /// + public sealed class NullCapability : ConsumedCapability + { + /// + /// Singleton instance + /// + public static readonly NullCapability Instance = new NullCapability(); + + NullCapability() + { + } + + /// + /// Does nothing + /// + protected override void ReleaseRemotely() + { + } + + internal override void AddRef() + { + } + + internal override Skeleton AsSkeleton() => NullSkeleton.Instance; + + internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args) + { + args.Dispose(); + throw new InvalidOperationException("Cannot call null capability"); + } + + internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) + { + writer.which = CapDescriptor.WHICH.None; + return null; + } + + internal override void Release() + { + } + + /// + /// String hint + /// + /// "Null capability" + public override string ToString() => "Null capability"; + } +} \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/NullSkeleton.cs b/Capnp.Net.Runtime/Rpc/NullSkeleton.cs new file mode 100644 index 0000000..17cd828 --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/NullSkeleton.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Capnp.Rpc +{ + /// + /// Null skeleton + /// + public sealed class NullSkeleton : Skeleton + { + /// + /// Singleton instance + /// + public static readonly NullSkeleton Instance = new NullSkeleton(); + + NullSkeleton() + { + } + + /// + /// Always throws an exception + /// + /// always thrown + public override Task Invoke(ulong interfaceId, ushort methodId, DeserializerState args, CancellationToken cancellationToken = default) + { + args.Dispose(); + throw new InvalidOperationException("Cannot call null capability"); + } + + internal override ConsumedCapability AsCapability() => NullCapability.Instance; + + internal override void Claim() + { + } + + internal override void Relinquish() + { + } + } +} \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs index a195b0c..8692f6f 100644 --- a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs @@ -88,7 +88,7 @@ namespace Capnp.Rpc } catch (System.Exception) { - throw new ArgumentOutOfRangeException("Illegal pointer field in transformation operation"); + throw new RpcException("Illegal pointer field in transformation operation"); } break; @@ -100,21 +100,20 @@ namespace Capnp.Rpc } } - Proxy proxy; - switch (cur.Kind) { case ObjectKind.Capability: try { - var cap = aorcq.Answer.Caps![(int)cur.CapabilityIndex]; - proxy = new Proxy(cap); + return new Proxy(aorcq.Answer.Caps![(int)cur.CapabilityIndex]); } catch (ArgumentOutOfRangeException) { - throw new ArgumentOutOfRangeException("Bad capability table in internal answer - internal error?"); + throw new RpcException("Capability index out of range"); } - return proxy; + + case ObjectKind.Nil: + return new Proxy(NullCapability.Instance); default: throw new ArgumentOutOfRangeException("Transformation did not result in a capability"); diff --git a/Capnp.Net.Runtime/Rpc/PendingQuestion.cs b/Capnp.Net.Runtime/Rpc/PendingQuestion.cs index 5d6aa50..c822893 100644 --- a/Capnp.Net.Runtime/Rpc/PendingQuestion.cs +++ b/Capnp.Net.Runtime/Rpc/PendingQuestion.cs @@ -58,7 +58,7 @@ namespace Capnp.Rpc SerializerState? _inParams; int _inhibitFinishCounter, _refCounter; - internal PendingQuestion(IRpcEndpoint ep, uint id, ConsumedCapability? target, SerializerState? inParams) + internal PendingQuestion(IRpcEndpoint ep, uint id, ConsumedCapability target, SerializerState? inParams) { RpcEndpoint = ep ?? throw new ArgumentNullException(nameof(ep)); _questionId = id; @@ -237,7 +237,7 @@ namespace Capnp.Rpc /// Access path /// Low-level capability /// The referenced member does not exist or does not resolve to a capability pointer. - public ConsumedCapability? Access(MemberAccessPath access) + public ConsumedCapability Access(MemberAccessPath access) { lock (ReentrancyBlocker) { @@ -268,7 +268,7 @@ namespace Capnp.Rpc /// promises the cap whose ownership is transferred to this object /// Low-level capability /// The referenced member does not exist or does not resolve to a capability pointer. - public ConsumedCapability? Access(MemberAccessPath access, Task task) + public ConsumedCapability Access(MemberAccessPath access, Task task) { var proxyTask = task.AsProxyTask(); return new RemoteAnswerCapability(this, access, proxyTask); diff --git a/Capnp.Net.Runtime/Rpc/PolySkeleton.cs b/Capnp.Net.Runtime/Rpc/PolySkeleton.cs index 002a75f..17def26 100644 --- a/Capnp.Net.Runtime/Rpc/PolySkeleton.cs +++ b/Capnp.Net.Runtime/Rpc/PolySkeleton.cs @@ -8,7 +8,7 @@ namespace Capnp.Rpc /// /// Combines multiple skeletons to represent objects which implement multiple interfaces. /// - public class PolySkeleton: Skeleton + public class PolySkeleton: RefCountingSkeleton { readonly Dictionary _ifmap = new Dictionary(); diff --git a/Capnp.Net.Runtime/Rpc/PromisedCapability.cs b/Capnp.Net.Runtime/Rpc/PromisedCapability.cs index 1f3a782..ad8db50 100644 --- a/Capnp.Net.Runtime/Rpc/PromisedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/PromisedCapability.cs @@ -8,7 +8,7 @@ namespace Capnp.Rpc { readonly uint _remoteId; readonly object _reentrancyBlocker = new object(); - readonly TaskCompletionSource _resolvedCap = new TaskCompletionSource(); + readonly TaskCompletionSource _resolvedCap = new TaskCompletionSource(); readonly Task _whenResolvedProxy; bool _released; @@ -149,9 +149,10 @@ namespace Capnp.Rpc lock (_reentrancyBlocker) { #if DebugFinalizers - resolvedCap.ResolvingCap = this; + if (resolvedCap != null) + resolvedCap.ResolvingCap = this; #endif - _resolvedCap.SetResult(resolvedCap); + _resolvedCap.SetResult(resolvedCap!); if (_pendingCallsOnPromise == 0) { diff --git a/Capnp.Net.Runtime/Rpc/Proxy.cs b/Capnp.Net.Runtime/Rpc/Proxy.cs index bcfa0be..de67ab1 100644 --- a/Capnp.Net.Runtime/Rpc/Proxy.cs +++ b/Capnp.Net.Runtime/Rpc/Proxy.cs @@ -53,18 +53,18 @@ namespace Capnp.Rpc return CapabilityReflection.CreateProxy(ConsumedCap) as T; } - ConsumedCapability? _consumedCap; + ConsumedCapability _consumedCap = NullCapability.Instance; /// /// Underlying low-level capability /// - protected internal ConsumedCapability? ConsumedCap => _disposedValue ? + protected internal ConsumedCapability ConsumedCap => _disposedValue ? throw new ObjectDisposedException(nameof(Proxy)) : _consumedCap; /// /// Whether is this a broken capability. /// - public bool IsNull => _consumedCap == null; + public bool IsNull => _consumedCap == NullCapability.Instance; /// /// Whether was called on this Proxy. @@ -100,12 +100,6 @@ namespace Capnp.Rpc throw new ObjectDisposedException(nameof(Proxy)); } - if (ConsumedCap == null) - { - args.Dispose(); - throw new InvalidOperationException("Cannot call null capability"); - } - var answer = ConsumedCap.DoCall(interfaceId, methodId, args); if (cancellationToken.CanBeCanceled) @@ -126,20 +120,17 @@ namespace Capnp.Rpc #endif } - internal Proxy(ConsumedCapability? cap): this() + internal Proxy(ConsumedCapability cap): this() { Bind(cap); } - internal void Bind(ConsumedCapability? cap) + internal void Bind(ConsumedCapability cap) { - if (ConsumedCap != null) + if (ConsumedCap != NullCapability.Instance) throw new InvalidOperationException("Proxy was already bound"); - if (cap == null) - return; - - _consumedCap = cap; + _consumedCap = cap ?? throw new ArgumentNullException(nameof(cap)); cap.AddRef(); #if DebugFinalizers @@ -151,15 +142,7 @@ namespace Capnp.Rpc internal async Task GetProvider() { var unwrapped = await ConsumedCap.Unwrap(); - - switch (unwrapped) - { - case LocalCapability lcap: - return lcap.ProvidedCap; - - default: - return Vine.Create(unwrapped); - } + return unwrapped.AsSkeleton(); } /// diff --git a/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs b/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs index c88d523..f8ae439 100644 --- a/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs @@ -7,6 +7,7 @@ namespace Capnp.Rpc abstract class RefCountingCapability: ConsumedCapability { readonly object _reentrancyBlocker = new object(); + Vine? _vine; // Note on reference counting: Works in analogy to COM. AddRef() adds a reference, // Release() removes it. When the reference count reaches zero, the capability must be @@ -122,5 +123,15 @@ namespace Capnp.Rpc } } } + + internal override Skeleton AsSkeleton() + { + lock (_reentrancyBlocker) + { + if (_vine == null) + _vine = new Vine(this); + return _vine; + } + } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/RefCountingSkeleton.cs b/Capnp.Net.Runtime/Rpc/RefCountingSkeleton.cs new file mode 100644 index 0000000..7c1359a --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/RefCountingSkeleton.cs @@ -0,0 +1,80 @@ +using System; +using System.Threading; + +namespace Capnp.Rpc +{ + /// + /// Skeleton with reference counting and dispose pattern + /// + public abstract class RefCountingSkeleton: Skeleton + { + int _refCount; + LocalCapability? _localCap; + + /// + /// Dispose pattern implementation + /// + protected virtual void Dispose(bool disposing) + { + } + + /// + /// Finalizer + /// + ~RefCountingSkeleton() + { + Dispose(false); + } + + internal sealed override void Claim() + { + int count, newCount; + + do + { + count = Volatile.Read(ref _refCount); + if (count < 0) + throw new ObjectDisposedException(nameof(RefCountingSkeleton)); + + newCount = count + 1; + + } while (Interlocked.CompareExchange(ref _refCount, newCount, count) != count); + } + + internal override void Relinquish() + { + int count, newCount; + + do + { + count = Volatile.Read(ref _refCount); + if (count < 0) + throw new ObjectDisposedException(nameof(RefCountingSkeleton)); + + newCount = count > 0 ? count - 1 : int.MinValue; + + } while (Interlocked.CompareExchange(ref _refCount, newCount, count) != count); + + if (newCount == 0) + { + Dispose(true); + GC.SuppressFinalize(this); + } + } + + /// + /// Whether this instance is in disposed state. + /// + public bool IsDisposed => Volatile.Read(ref _refCount) < 0; + + internal override ConsumedCapability AsCapability() + { + var cap = Volatile.Read(ref _localCap); + if (cap == null) + { + Interlocked.CompareExchange(ref _localCap, new LocalCapability(this), null); + } + return Volatile.Read(ref _localCap)!; + } + } +} \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs index b2be02f..2a49425 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs @@ -102,19 +102,6 @@ namespace Capnp.Rpc if (!_question.StateFlags.HasFlag(PendingQuestion.State.TailCall) && _question.StateFlags.HasFlag(PendingQuestion.State.Returned)) { - try - { - if (ResolvedCap == null) - { - throw new RpcException("Answer did not resolve to expected capability"); - } - } - catch - { - args.Dispose(); - throw; - } - return CallOnResolution(interfaceId, methodId, args); } else @@ -198,8 +185,7 @@ namespace Capnp.Rpc } else if (_question.IsTailCall) { - var vine = Vine.Create(this); - uint id = endpoint.AllocateExport(vine, out bool first); + uint id = endpoint.AllocateExport(AsSkeleton(), out bool first); writer.which = CapDescriptor.WHICH.SenderHosted; writer.SenderHosted = id; diff --git a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs index 7ff6553..937b66b 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs @@ -36,7 +36,10 @@ namespace Capnp.Rpc try { - if (resolvedCap is RemoteCapability || + if (resolvedCap is NullCapability || + // Must not request disembargo on null cap + + resolvedCap is RemoteCapability || //# Note that in the case where Carol actually lives in Vat B (i.e., the same vat that the promise //# already pointed at), no embargo is needed, because the pipelined calls are delivered over the //# same path as the later direct calls. @@ -84,7 +87,7 @@ namespace Capnp.Rpc cancellationTokenSource.Token.ThrowIfCancellationRequested(); } - using var proxy = new Proxy(ResolvedCap); + using var proxy = new Proxy(resolvedCap); return proxy.Call(interfaceId, methodId, args, default); }, TaskContinuationOptions.ExecuteSynchronously); diff --git a/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs b/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs index 7fae258..3fb8cbc 100644 --- a/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs +++ b/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs @@ -1,11 +1,12 @@ using System; +using System.Threading; using System.Threading.Tasks; namespace Capnp.Rpc { static class ResolvingCapabilityExtensions { - public static async Task Unwrap(this ConsumedCapability? cap) + public static async Task Unwrap(this ConsumedCapability cap) { while (cap is IResolvingCapability resolving) { @@ -20,7 +21,7 @@ namespace Capnp.Rpc public static Action? ExportAsSenderPromise(this T cap, IRpcEndpoint endpoint, CapDescriptor.WRITER writer) where T: ConsumedCapability, IResolvingCapability { - var vine = Vine.Create(cap); + var vine = cap.AsSkeleton(); uint preliminaryId = endpoint.AllocateExport(vine, out bool first); writer.which = CapDescriptor.WHICH.SenderPromise; @@ -58,7 +59,10 @@ namespace Capnp.Rpc } catch (TaskCanceledException exception) { - return new Proxy(LazyCapability.CreateCanceledCap(exception.CancellationToken)); + var token = exception.CancellationToken; + if (!token.IsCancellationRequested) + token = new CancellationToken(true); + return new Proxy(LazyCapability.CreateCanceledCap(token)); } catch (System.Exception exception) { @@ -68,7 +72,7 @@ namespace Capnp.Rpc switch (obj) { case Proxy proxy: return proxy; - case null: return new Proxy(null); + case null: return new Proxy(NullCapability.Instance); default: return BareProxy.FromImpl(obj); } } diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs index c271d53..3a32481 100644 --- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs +++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs @@ -310,7 +310,7 @@ namespace Capnp.Rpc return AllocateExport(providedCapability, out first); } - PendingQuestion AllocateQuestion(ConsumedCapability? target, SerializerState? inParams) + PendingQuestion AllocateQuestion(ConsumedCapability target, SerializerState? inParams) { lock (_reentrancyBlocker) { @@ -359,7 +359,7 @@ namespace Capnp.Rpc if (bootstrapCap != null) { ret.which = Return.WHICH.Results; - bootstrap.SetCapability(bootstrap.ProvideCapability(LocalCapability.Create(bootstrapCap))); + bootstrap.SetCapability(bootstrap.ProvideCapability(bootstrapCap.AsCapability())); ret.Results!.Content = bootstrap; bootstrapTask = Task.FromResult(bootstrap); @@ -897,29 +897,20 @@ namespace Capnp.Rpc disembargo.Target.PromisedAnswer, async t => { - try - { - using var proxy = await t; + using var proxy = await t; - if (proxy.ConsumedCap is RemoteCapability remote && remote.Endpoint == this) - { + if (proxy.ConsumedCap is RemoteCapability remote && remote.Endpoint == this) + { #if DebugEmbargos - Logger.LogDebug($"Sender loopback disembargo. Thread = {Thread.CurrentThread.Name}"); + Logger.LogDebug($"Sender loopback disembargo. Thread = {Thread.CurrentThread.Name}"); #endif - Tx(mb.Frame); - } - else - { - Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender."); - - throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me"); - } + Tx(mb.Frame); } - catch (System.Exception exception) + else { - Logger.LogWarning($"Sender loopback request: Peer asked for disembargoing an answer which either has not yet returned, was canceled, or faulted: {exception.Message}"); + Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender."); - throw new RpcProtocolErrorException($"'Disembargo' failure: {exception}"); + throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me"); } }); } @@ -1177,7 +1168,7 @@ namespace Capnp.Rpc mb.InitCapTable(); var req = mb.BuildRoot(); req.which = Message.WHICH.Bootstrap; - var pendingBootstrap = AllocateQuestion(null, null); + var pendingBootstrap = AllocateQuestion(NullCapability.Instance, null); req.Bootstrap!.QuestionId = pendingBootstrap.QuestionId; Tx(mb.Frame); @@ -1315,7 +1306,7 @@ namespace Capnp.Rpc case CapDescriptor.WHICH.ReceiverHosted: if (_exportTable.TryGetValue(capDesc.ReceiverHosted, out var rc)) { - return LocalCapability.Create(rc.Cap); + return rc.Cap.AsCapability(); } else { @@ -1370,6 +1361,9 @@ namespace Capnp.Rpc return newCap; } + case CapDescriptor.WHICH.None: + return NullCapability.Instance; + default: Logger.LogWarning("Unknown capability descriptor category"); throw new RpcUnimplementedException(); diff --git a/Capnp.Net.Runtime/Rpc/Skeleton.cs b/Capnp.Net.Runtime/Rpc/Skeleton.cs index 32cdbaf..e6a0e83 100644 --- a/Capnp.Net.Runtime/Rpc/Skeleton.cs +++ b/Capnp.Net.Runtime/Rpc/Skeleton.cs @@ -26,10 +26,6 @@ namespace Capnp.Rpc } } -#if DEBUG_DISPOSE - const int NoDisposeFlag = 0x4000000; -#endif - static readonly ConditionalWeakTable _implMap = new ConditionalWeakTable(); @@ -63,31 +59,6 @@ namespace Capnp.Rpc return new SkeletonRelinquisher(GetOrCreateSkeleton(impl, true)); } -#if DEBUG_DISPOSE - /// - /// This DEBUG-only diagnostic method states that the Skeleton corresponding to a given capability is not expected to - /// be disposed until the next call to EndAssertNotDisposed(). - /// - /// Capability interface - /// Capability implementation - public static void BeginAssertNotDisposed(T impl) where T : class - { - GetOrCreateSkeleton(impl, false).BeginAssertNotDisposed(); - } - - /// - /// This DEBUG-only diagnostic method ends a non-disposal period started with BeginAssertNotDisposed. - /// - /// Capability interface - /// Capability implementation - public static void EndAssertNotDisposed(T impl) where T : class - { - GetOrCreateSkeleton(impl, false).EndAssertNotDisposed(); - } -#endif - - int _refCount = 0; - /// /// Calls an interface method of this capability. /// @@ -98,43 +69,8 @@ namespace Capnp.Rpc /// A Task which will resolve to the call result public abstract Task Invoke(ulong interfaceId, ushort methodId, DeserializerState args, CancellationToken cancellationToken = default); - internal void Claim() - { - Interlocked.Increment(ref _refCount); - } - -#if DEBUG_DISPOSE - internal void BeginAssertNotDisposed() - { - if ((Interlocked.Add(ref _refCount, NoDisposeFlag) & NoDisposeFlag) == 0) - { - throw new InvalidOperationException("Flag already set. State is now broken."); - } - } - internal void EndAssertNotDisposed() - { - if ((Interlocked.Add(ref _refCount, -NoDisposeFlag) & NoDisposeFlag) != 0) - { - throw new InvalidOperationException("Flag already cleared. State is now broken."); - } - } -#endif - - internal void Relinquish() - { - int count = Interlocked.Decrement(ref _refCount); - - if (0 == count) - { -#if DEBUG_DISPOSE - if ((count & NoDisposeFlag) != 0) - throw new InvalidOperationException("Unexpected Skeleton disposal"); -#endif - - Dispose(true); - GC.SuppressFinalize(this); - } - } + internal abstract void Claim(); + internal abstract void Relinquish(); internal void Relinquish(int count) { @@ -145,41 +81,21 @@ namespace Capnp.Rpc Relinquish(); } - /// - /// Dispose pattern implementation - /// - protected virtual void Dispose(bool disposing) - { - } - - /// - /// Finalizer - /// - ~Skeleton() - { - Dispose(false); - } - internal virtual void Bind(object impl) { - throw new NotSupportedException(); + throw new NotSupportedException("Cannot bind"); } + + internal abstract ConsumedCapability AsCapability(); } /// /// Skeleton for a specific capability interface. /// /// Capability interface - public abstract class Skeleton : Skeleton, IMonoSkeleton + public abstract class Skeleton : RefCountingSkeleton, IMonoSkeleton { -#if DebugEmbargos - ILogger Logger { get; } = Logging.CreateLogger>(); -#endif - Func>[] _methods = null!; - CancellationTokenSource? _disposed = new CancellationTokenSource(); - readonly object _reentrancyBlocker = new object(); - int _pendingCalls; /// /// Constructs an instance. @@ -225,45 +141,7 @@ namespace Capnp.Rpc if (methodId >= _methods.Length) throw new NotImplementedException("Wrong method id"); - lock (_reentrancyBlocker) - { - if (_disposed == null || _disposed.IsCancellationRequested) - { - throw new ObjectDisposedException(nameof(Skeleton)); - } - - ++_pendingCalls; - } - - var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(_disposed.Token, cancellationToken); - - try - { - return await _methods[methodId](args, linkedSource.Token); - } - catch (System.Exception) - { - throw; - } - finally - { - lock (_reentrancyBlocker) - { - --_pendingCalls; - } - - linkedSource.Dispose(); - CheckCtsDisposal(); - } - } - - void CheckCtsDisposal() - { - if (_pendingCalls == 0 && _disposed != null && _disposed.IsCancellationRequested) - { - _disposed.Dispose(); - _disposed = null; - } + return await _methods[methodId](args, cancellationToken); } /// @@ -271,16 +149,6 @@ namespace Capnp.Rpc /// protected override void Dispose(bool disposing) { - lock (_reentrancyBlocker) - { - if (_disposed == null || _disposed.IsCancellationRequested) - return; - - _disposed.Cancel(); - - CheckCtsDisposal(); - } - if (disposing && Impl is IDisposable disposable) { disposable.Dispose(); diff --git a/Capnp.Net.Runtime/Rpc/Vine.cs b/Capnp.Net.Runtime/Rpc/Vine.cs index 1c4f29c..c17c1be 100644 --- a/Capnp.Net.Runtime/Rpc/Vine.cs +++ b/Capnp.Net.Runtime/Rpc/Vine.cs @@ -5,66 +5,37 @@ using System.Threading.Tasks; namespace Capnp.Rpc { + class Vine : Skeleton { - public static Skeleton Create(ConsumedCapability? cap) + public Vine(ConsumedCapability consumedCap) { - if (cap is LocalCapability lcap) - return lcap.ProvidedCap; - else - return new Vine(cap); + Cap = consumedCap; } - Vine(ConsumedCapability? consumedCap) - { - Proxy = new Proxy(consumedCap); + public ConsumedCapability Cap { get; } -#if DebugFinalizers - CreatorStackTrace = Environment.StackTrace; -#endif + internal override ConsumedCapability AsCapability() => Cap; + + internal override void Claim() + { + Cap.AddRef(); } -#if DebugFinalizers - ~Vine() + internal override void Relinquish() { - Logger.LogWarning($"Caught orphaned Vine, created from here: {CreatorStackTrace}."); - - Dispose(false); + Cap.Release(); } - ILogger Logger { get; } = Logging.CreateLogger(); - string CreatorStackTrace { get; } -#endif - - internal override void Bind(object impl) - { - throw new NotImplementedException(); - } - - public Proxy Proxy { get; } - public async override Task Invoke( ulong interfaceId, ushort methodId, DeserializerState args, CancellationToken cancellationToken = default) { - var promisedAnswer = Proxy.Call(interfaceId, methodId, (DynamicSerializerState)args, default); + using var proxy = new Proxy(Cap); + var promisedAnswer = proxy.Call(interfaceId, methodId, (DynamicSerializerState)args, false, cancellationToken); if (promisedAnswer is PendingQuestion pendingQuestion && pendingQuestion.RpcEndpoint == Impatient.AskingEndpoint) { - async void SetupCancellation() - { - try - { - using var registration = cancellationToken.Register(promisedAnswer.Dispose); - await promisedAnswer.WhenReturned; - } - catch - { - } - } - - SetupCancellation(); - return pendingQuestion; } else @@ -73,16 +44,5 @@ namespace Capnp.Rpc return (DynamicSerializerState)await promisedAnswer.WhenReturned; } } - - protected override void Dispose(bool disposing) - { - if (disposing) - Proxy.Dispose(); - else - try { Proxy.Dispose(); } - catch { } - - base.Dispose(disposing); - } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/SerializerState.cs b/Capnp.Net.Runtime/SerializerState.cs index c717f15..30fc21e 100644 --- a/Capnp.Net.Runtime/SerializerState.cs +++ b/Capnp.Net.Runtime/SerializerState.cs @@ -237,33 +237,31 @@ namespace Capnp } } - internal Rpc.ConsumedCapability? DecodeCapPointer(int offset) + internal Rpc.ConsumedCapability DecodeCapPointer(int offset) { if (Caps == null) throw new InvalidOperationException("Capbility table not set"); if (!IsAllocated) { - return null; + return Rpc.NullCapability.Instance; } WirePointer pointer = RawData[offset]; if (pointer.IsNull) { - return null; + return Rpc.NullCapability.Instance; } if (pointer.Kind != PointerKind.Other) { - throw new Rpc.RpcException( - "Expected a capability pointer, but got something different"); + throw new Rpc.RpcException("Expected a capability pointer, but got something different"); } if (pointer.CapabilityIndex >= Caps.Count) { - throw new Rpc.RpcException( - "Capability index out of range"); + throw new Rpc.RpcException("Capability index out of range"); } return Caps[(int)pointer.CapabilityIndex]; @@ -1237,9 +1235,9 @@ namespace Capnp /// The low-level capability object to provide. /// Index of the given capability in the capability table, null if capability is null /// The underlying message builder was not configured for capability table support. - public uint? ProvideCapability(Rpc.ConsumedCapability? capability) + public uint? ProvideCapability(Rpc.ConsumedCapability capability) { - if (capability == null) + if (capability == null || capability == Rpc.NullCapability.Instance) return null; if (Caps == null) @@ -1251,7 +1249,7 @@ namespace Capnp { index = Caps.Count; Caps.Add(capability); - capability?.AddRef(); + capability.AddRef(); } return (uint)index; @@ -1263,9 +1261,9 @@ namespace Capnp /// The capability to provide, in terms of its skeleton. /// Index of the given capability in the capability table /// The underlying message builder was not configured for capability table support. - public uint ProvideCapability(Rpc.Skeleton capability) + public uint? ProvideCapability(Rpc.Skeleton capability) { - return ProvideCapability(Rpc.LocalCapability.Create(capability))!.Value; + return ProvideCapability(capability.AsCapability()); } /// @@ -1356,7 +1354,7 @@ namespace Capnp } } - internal Rpc.ConsumedCapability? StructReadRawCap(int index) + internal Rpc.ConsumedCapability StructReadRawCap(int index) { if (Kind != ObjectKind.Struct && Kind != ObjectKind.Nil) throw new InvalidOperationException("Allowed on structs only"); @@ -1376,10 +1374,10 @@ namespace Capnp /// is out of range. /// The desired interface does not qualify as capability interface () /// This state does not represent a struct. - public T? ReadCap(int slot) where T : class + public T ReadCap(int slot) where T : class { var cap = StructReadRawCap(slot); - return Rpc.CapabilityReflection.CreateProxy(cap) as T; + return (Rpc.CapabilityReflection.CreateProxy(cap) as T)!; } ///