diff --git a/Capnp.Net.Runtime.Tests/TcpRpcErrorHandling.cs b/Capnp.Net.Runtime.Tests/TcpRpcErrorHandling.cs index 163d85e..300320d 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcErrorHandling.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcErrorHandling.cs @@ -563,7 +563,7 @@ namespace Capnp.Net.Runtime.Tests _.Call.QuestionId = 42; _.Call.Target.which = MessageTarget.WHICH.ImportedCap; _.Call.Target.ImportedCap = bootCapId; - _.Call.InterfaceId = ((TypeIdAttribute)typeof(ITestPipeline).GetCustomAttributes(typeof(TypeIdAttribute), false)[0]).Id; + _.Call.InterfaceId = new TestPipeline_Skeleton().InterfaceId; _.Call.MethodId = 0; var wr = _.Call.Params.Content.Rewrap(); wr.InCap = null; @@ -588,7 +588,7 @@ namespace Capnp.Net.Runtime.Tests { _1.which = Message.WHICH.Unimplemented; _1.Unimplemented.which = Message.WHICH.Resolve; - Reserializing.DeepCopy(_, _1.Unimplemented.Resolve); + Reserializing.DeepCopy(_.Resolve, _1.Unimplemented.Resolve); }); Assert.IsFalse(impl.IsGrandsonCapDisposed); diff --git a/Capnp.Net.Runtime.Tests/test.cs b/Capnp.Net.Runtime.Tests/test.cs index e56b8ff..db23211 100644 --- a/Capnp.Net.Runtime.Tests/test.cs +++ b/Capnp.Net.Runtime.Tests/test.cs @@ -17255,7 +17255,11 @@ namespace Capnproto_test.Capnp.Test static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap = new MemberAccessPath(1U, 0U); public static Capnproto_test.Capnp.Test.ITestInterface OutBox_Cap(this Task<(string, Capnproto_test.Capnp.Test.TestPipeline.Box)> task) { - return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap)); + async Task AwaitProxy() => (await task).Item2.Cap; + + return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy(Impatient.GetAnswer(task).Access( + Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap, + AwaitProxy())); } static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap = new MemberAccessPath(1U, 0U); diff --git a/Capnp.Net.Runtime/DeserializerState.cs b/Capnp.Net.Runtime/DeserializerState.cs index 440df25..b420725 100644 --- a/Capnp.Net.Runtime/DeserializerState.cs +++ b/Capnp.Net.Runtime/DeserializerState.cs @@ -106,6 +106,11 @@ namespace Capnp case ObjectKind.ListOfStructs: case ObjectKind.Nil: case ObjectKind.Struct: + if (state.Caps != null) + { + foreach (var cap in state.Caps) + cap?.Release(true); + } return new DeserializerState(state.Allocator!.Segments) { CurrentSegmentIndex = state.SegmentIndex, diff --git a/Capnp.Net.Runtime/DynamicSerializerState.cs b/Capnp.Net.Runtime/DynamicSerializerState.cs index a58c688..edf3d15 100644 --- a/Capnp.Net.Runtime/DynamicSerializerState.cs +++ b/Capnp.Net.Runtime/DynamicSerializerState.cs @@ -42,7 +42,11 @@ namespace Capnp { var mb = MessageBuilder.Create(); if (state.Caps != null) + { mb.InitCapTable(); + foreach (var cap in state.Caps) + cap?.AddRef(); + } var sstate = mb.CreateObject(); Reserializing.DeepCopy(state, sstate); diff --git a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs index cd86024..18477f4 100644 --- a/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs +++ b/Capnp.Net.Runtime/ListOfPrimitivesSerializer.cs @@ -92,19 +92,19 @@ namespace Capnp switch (items) { case T[] array: - array.CopyTo(Span); + array.CopyTo(Data); break; case ArraySegment segment: - segment.AsSpan().CopyTo(Span); + segment.AsSpan().CopyTo(Data); break; case ListOfPrimitivesDeserializer deser: - deser.Span.CopyTo(Span); + deser.Span.CopyTo(Data); break; case ListOfPrimitivesSerializer ser: - ser.Span.CopyTo(Span); + ser.Data.CopyTo(Data); break; default: @@ -116,12 +116,18 @@ namespace Capnp } } + IEnumerable Enumerate() + { + for (int i = 0; i < Data.Length; i++) + yield return Data[i]; + } + /// /// Implements . /// /// public IEnumerator GetEnumerator() => Enumerate().GetEnumerator(); - IEnumerator IEnumerable.GetEnumerator() => Span.ToArray().GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs b/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs index 1035f0b..1204fa0 100644 --- a/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs @@ -19,6 +19,7 @@ internal abstract void AddRef(); internal abstract void Release( + bool keepAlive, [System.Runtime.CompilerServices.CallerMemberName] string methodName = "", [System.Runtime.CompilerServices.CallerFilePath] string filePath = "", [System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0); diff --git a/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs b/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs index 250aa18..7b774f8 100644 --- a/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/IPromisedAnswer.cs @@ -23,5 +23,7 @@ namespace Capnp.Rpc /// Path to the desired capability inside the result struct. /// Pipelined low-level capability ConsumedCapability? Access(MemberAccessPath access); + + ConsumedCapability? Access(MemberAccessPath access, Task proxyTask); } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/IResolvingCapability.cs b/Capnp.Net.Runtime/Rpc/IResolvingCapability.cs index 36f8bf0..cc26602 100644 --- a/Capnp.Net.Runtime/Rpc/IResolvingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/IResolvingCapability.cs @@ -10,6 +10,6 @@ namespace Capnp.Rpc /// /// Will eventually give the resolved capability. /// - Task WhenResolved { get; } + Task WhenResolved { get; } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs index c8ea190..36b95d6 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/CallContext.cs @@ -25,10 +25,10 @@ namespace Capnp.Rpc.Interception public Task WhenReturned => _futureResult.Task; public CancellationToken CancelFromAlice => _cancelFromAlice.Token; - async Task AccessWhenReturned(MemberAccessPath access) + async Task AccessWhenReturned(MemberAccessPath access) { await WhenReturned; - return new Proxy(Access(access)); + return Access(access); } public ConsumedCapability? Access(MemberAccessPath access) @@ -50,6 +50,27 @@ namespace Capnp.Rpc.Interception } } + public ConsumedCapability? Access(MemberAccessPath _, Task task) + { + var proxyTask = task.AsProxyTask(); + + if (proxyTask.IsCompleted) + { + try + { + return proxyTask.Result?.ConsumedCap; + } + catch (AggregateException exception) + { + throw exception.InnerException!; + } + } + else + { + return new LazyCapability(proxyTask); + } + } + public void Dispose() { try diff --git a/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs b/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs index 89573ba..bd66f9b 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs @@ -16,7 +16,7 @@ protected override void ReleaseRemotely() { - InterceptedCapability.Release(); + InterceptedCapability.Release(false); } internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args) diff --git a/Capnp.Net.Runtime/Rpc/LazyCapability.cs b/Capnp.Net.Runtime/Rpc/LazyCapability.cs index 965d054..d56297c 100644 --- a/Capnp.Net.Runtime/Rpc/LazyCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LazyCapability.cs @@ -8,32 +8,45 @@ namespace Capnp.Rpc { public static LazyCapability CreateBrokenCap(string message) { - var cap = new LazyCapability(Task.FromException(new RpcException(message))); + var cap = new LazyCapability(Task.FromException(new RpcException(message))); cap.AddRef(); // Instance shall be persistent return cap; } public static LazyCapability CreateCanceledCap(CancellationToken token) { - var cap = new LazyCapability(Task.FromCanceled(token)); + var cap = new LazyCapability(Task.FromCanceled(token)); cap.AddRef(); // Instance shall be persistent return cap; } public static LazyCapability Null { get; } = CreateBrokenCap("Null capability"); - public LazyCapability(Task capabilityTask) + readonly Task? _proxyTask; + + public LazyCapability(Task capabilityTask) { WhenResolved = capabilityTask; } + public LazyCapability(Task proxyTask) + { + _proxyTask = proxyTask; + + async Task AwaitCap() => (await _proxyTask!).ConsumedCap; + + WhenResolved = AwaitCap(); + } + internal override void Freeze(out IRpcEndpoint? boundEndpoint) { if (WhenResolved.IsCompleted) { + boundEndpoint = null; + try { - WhenResolved.Result.Freeze(out boundEndpoint); + WhenResolved.Result?.Freeze(out boundEndpoint); } catch (AggregateException exception) { @@ -54,7 +67,8 @@ namespace Capnp.Rpc { if (WhenResolved.ReplacementTaskIsCompletedSuccessfully()) { - WhenResolved.Result.Export(endpoint, writer); + using var proxy = new Proxy(WhenResolved.Result); + proxy.Export(endpoint, writer); } else { @@ -62,24 +76,21 @@ namespace Capnp.Rpc } } - async void DisposeProxyWhenResolved() - { - try - { - var cap = await WhenResolved; - if (cap != null) cap.Dispose(); - } - catch - { - } - } - protected override void ReleaseRemotely() { - DisposeProxyWhenResolved(); + if (_proxyTask != null) + { + async void DisposeProxyWhenResolved() + { + try { using var _ = await _proxyTask!; } + catch { } + } + + DisposeProxyWhenResolved(); + } } - public Task WhenResolved { get; } + public Task WhenResolved { get; } async Task CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken) { @@ -90,7 +101,8 @@ namespace Capnp.Rpc if (cap == null) throw new RpcException("Broken capability"); - var call = cap.Call(interfaceId, methodId, args, default); + using var proxy = new Proxy(cap); + var call = proxy.Call(interfaceId, methodId, args, default); var whenReturned = call.WhenReturned; using (var registration = cancellationToken.Register(call.Dispose)) diff --git a/Capnp.Net.Runtime/Rpc/LocalAnswer.cs b/Capnp.Net.Runtime/Rpc/LocalAnswer.cs index 8744cb6..a90166f 100644 --- a/Capnp.Net.Runtime/Rpc/LocalAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/LocalAnswer.cs @@ -18,35 +18,27 @@ namespace Capnp.Rpc async void CleanupAfterReturn() { - try - { - await WhenReturned; - } - catch - { - } - finally - { - _cts.Dispose(); - } + try { await WhenReturned; } + catch { } + finally { _cts.Dispose(); } } public Task WhenReturned { get; } public ConsumedCapability Access(MemberAccessPath access) { - return new LocalAnswerCapability(WhenReturned, access); + return new LocalAnswerCapabilityDeprecated(WhenReturned, access); + } + + public ConsumedCapability Access(MemberAccessPath _, Task task) + { + return new LocalAnswerCapability(task.AsProxyTask()); } public void Dispose() { - try - { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - } + try { _cts.Cancel(); } + catch (ObjectDisposedException) { } } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs b/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs index 195d139..fdd8ad4 100644 --- a/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs @@ -4,15 +4,17 @@ using System.Threading.Tasks; namespace Capnp.Rpc { + class LocalAnswerCapability : RefCountingCapability, IResolvingCapability { - readonly Task _answer; - readonly MemberAccessPath _access; + readonly Task _whenResolvedProxy; - public LocalAnswerCapability(Task answer, MemberAccessPath access) + public LocalAnswerCapability(Task proxyTask) { - _answer = answer; - _access = access; + _whenResolvedProxy = proxyTask; + + async Task AwaitResolved() => (await _whenResolvedProxy).ConsumedCap; + WhenResolved = AwaitResolved(); } internal override void Freeze(out IRpcEndpoint? boundEndpoint) @@ -24,31 +26,20 @@ namespace Capnp.Rpc { } - async Task AwaitResolved() - { - var state = await _answer; - return new Proxy(_access.Eval(state)); - } - public Task WhenResolved => AwaitResolved(); + public Task WhenResolved { get; private set; } internal override void Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) { - if (_answer.IsCompleted) + if (_whenResolvedProxy.IsCompleted) { - DeserializerState result; try { - result = _answer.Result; + _whenResolvedProxy.Result.Export(endpoint, writer); } catch (AggregateException exception) { - throw exception.InnerException!; - } - - using (var proxy = new Proxy(_access.Eval(result))) - { - proxy.Export(endpoint, writer); + throw exception.InnerException; } } else @@ -59,20 +50,18 @@ namespace Capnp.Rpc async Task CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken) { - var cap = await AwaitResolved(); + var proxy = await _whenResolvedProxy; cancellationToken.ThrowIfCancellationRequested(); - if (cap == null) + if (proxy.IsNull) throw new RpcException("Broken capability"); - var call = cap.Call(interfaceId, methodId, args, default); + var call = proxy.Call(interfaceId, methodId, args, default); var whenReturned = call.WhenReturned; - using (var registration = cancellationToken.Register(() => call.Dispose())) - { - return await whenReturned; - } + using var registration = cancellationToken.Register(() => call.Dispose()); + return await whenReturned; } internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args) @@ -81,9 +70,10 @@ namespace Capnp.Rpc return new LocalAnswer(cts, CallImpl(interfaceId, methodId, args, cts.Token)); } - protected override void ReleaseRemotely() + protected async override void ReleaseRemotely() { - this.DisposeWhenResolved(); + try { using var _ = await _whenResolvedProxy; } + catch { } } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/LocalAnswerCapabilityDeprecated.cs b/Capnp.Net.Runtime/Rpc/LocalAnswerCapabilityDeprecated.cs new file mode 100644 index 0000000..f78788d --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/LocalAnswerCapabilityDeprecated.cs @@ -0,0 +1,83 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Capnp.Rpc +{ + class LocalAnswerCapabilityDeprecated : RefCountingCapability, IResolvingCapability + { + readonly Task _answer; + readonly MemberAccessPath _access; + + public LocalAnswerCapabilityDeprecated(Task answer, MemberAccessPath access) + { + _answer = answer; + _access = access; + + async Task AwaitResolved() => access.Eval(await _answer); + WhenResolved = AwaitResolved(); + } + + internal override void Freeze(out IRpcEndpoint? boundEndpoint) + { + boundEndpoint = null; + } + + internal override void Unfreeze() + { + } + + + public Task WhenResolved { get; private set; } + + internal override void Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) + { + if (_answer.IsCompleted) + { + DeserializerState result; + try + { + result = _answer.Result; + } + catch (AggregateException exception) + { + throw exception.InnerException!; + } + + using var proxy = new Proxy(_access.Eval(result)); + proxy.Export(endpoint, writer); + } + else + { + this.ExportAsSenderPromise(endpoint, writer); + } + } + + async Task CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken) + { + var cap = await WhenResolved; + + cancellationToken.ThrowIfCancellationRequested(); + + if (cap == null) + throw new RpcException("Broken capability"); + + using var proxy = new Proxy(cap); + var call = proxy.Call(interfaceId, methodId, args, default); + var whenReturned = call.WhenReturned; + + using var registration = cancellationToken.Register(() => call.Dispose()); + return await whenReturned; + } + + internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args) + { + var cts = new CancellationTokenSource(); + return new LocalAnswer(cts, CallImpl(interfaceId, methodId, args, cts.Token)); + } + + protected override void ReleaseRemotely() + { + } + } +} \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/LocalCapability.cs b/Capnp.Net.Runtime/Rpc/LocalCapability.cs index 23b0ced..d0dc90f 100644 --- a/Capnp.Net.Runtime/Rpc/LocalCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LocalCapability.cs @@ -7,15 +7,12 @@ namespace Capnp.Rpc { class LocalCapability : ConsumedCapability { - static readonly ConditionalWeakTable _localCaps = - new ConditionalWeakTable(); - public static ConsumedCapability Create(Skeleton skeleton) { if (skeleton is Vine vine) return vine.Proxy.ConsumedCap!; else - return _localCaps.GetValue(skeleton, _ => new LocalCapability(_)); + return new LocalCapability(skeleton); } static async Task AwaitAnswer(Task call) @@ -25,6 +22,7 @@ namespace Capnp.Rpc } public Skeleton ProvidedCap { get; } + int _releaseFlag; LocalCapability(Skeleton providedCap) { @@ -33,15 +31,20 @@ namespace Capnp.Rpc internal override void AddRef() { - ProvidedCap.Claim(); + if (0 == Interlocked.CompareExchange(ref _releaseFlag, 0, 1)) + ProvidedCap.Claim(); } internal override void Release( + bool keepAlive, [System.Runtime.CompilerServices.CallerMemberName] string methodName = "", [System.Runtime.CompilerServices.CallerFilePath] string filePath = "", [System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0) { - ProvidedCap.Relinquish(); + if (keepAlive) + Interlocked.Exchange(ref _releaseFlag, 1); + else + ProvidedCap.Relinquish(); } internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args) diff --git a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs index 5959b7e..ed51a34 100644 --- a/Capnp.Net.Runtime/Rpc/PendingAnswer.cs +++ b/Capnp.Net.Runtime/Rpc/PendingAnswer.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -27,6 +28,8 @@ namespace Capnp.Rpc public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None; + public IReadOnlyList CapTable { get; set; } + public void Cancel() { _cts?.Cancel(); @@ -96,7 +99,7 @@ namespace Capnp.Rpc else { var path = MemberAccessPath.Deserialize(rd); - var cap = new RemoteAnswerCapability(aorcq.Counterquestion!, path); + var cap = new RemoteAnswerCapabilityDeprecated(aorcq.Counterquestion!, path); return new Proxy(cap); } } diff --git a/Capnp.Net.Runtime/Rpc/PendingQuestion.cs b/Capnp.Net.Runtime/Rpc/PendingQuestion.cs index 227f189..cf7a900 100644 --- a/Capnp.Net.Runtime/Rpc/PendingQuestion.cs +++ b/Capnp.Net.Runtime/Rpc/PendingQuestion.cs @@ -252,7 +252,39 @@ namespace Capnp.Rpc } else { - return new RemoteAnswerCapability(this, access); + return new RemoteAnswerCapabilityDeprecated(this, access); + } + } + } + + /// + /// Refer to a (possibly nested) member of this question's (possibly future) result and return + /// it as a capability. + /// + /// promises the cap whose ownership is transferred to this object + /// Low-level capability + /// The referenced member does not exist or does not resolve to a capability pointer. + public ConsumedCapability? Access(MemberAccessPath access, Task task) + { + var proxyTask = task.AsProxyTask(); + + lock (ReentrancyBlocker) + { + if (proxyTask.IsCompleted && !StateFlags.HasFlag(State.TailCall)) + { + try + { + using var proxy = proxyTask.Result; + return proxy.ConsumedCap; + } + catch (AggregateException exception) + { + throw exception.InnerException!; + } + } + else + { + return new RemoteAnswerCapabilityDeprecated(this, access); } } } @@ -263,13 +295,13 @@ namespace Capnp.Rpc { foreach (var cap in inParams.Caps!) { - cap?.Release(); + cap?.Release(false); } } if (target != null) { - target.Release(); + target.Release(false); } } @@ -277,7 +309,7 @@ namespace Capnp.Rpc { foreach (var cap in outParams.Caps!) { - cap?.Release(); + cap?.Release(false); } } diff --git a/Capnp.Net.Runtime/Rpc/PromisedCapability.cs b/Capnp.Net.Runtime/Rpc/PromisedCapability.cs index 083a13d..27a23f4 100644 --- a/Capnp.Net.Runtime/Rpc/PromisedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/PromisedCapability.cs @@ -8,15 +8,19 @@ namespace Capnp.Rpc { readonly uint _remoteId; readonly object _reentrancyBlocker = new object(); - readonly TaskCompletionSource _resolvedCap = new TaskCompletionSource(); + readonly TaskCompletionSource _resolvedCap = new TaskCompletionSource(); + readonly Task _whenResolvedProxy; bool _released; public PromisedCapability(IRpcEndpoint ep, uint remoteId): base(ep) { _remoteId = remoteId; + + async Task AwaitProxy() => new Proxy(await WhenResolved); + _whenResolvedProxy = AwaitProxy(); } - public override Task WhenResolved => _resolvedCap.Task; + public override Task WhenResolved => _resolvedCap.Task; internal override void Freeze(out IRpcEndpoint? boundEndpoint) { @@ -24,9 +28,11 @@ namespace Capnp.Rpc { if (_resolvedCap.Task.IsCompleted && _pendingCallsOnPromise == 0) { + boundEndpoint = null; + try { - _resolvedCap.Task.Result.Freeze(out boundEndpoint); + _resolvedCap.Task.Result?.Freeze(out boundEndpoint); } catch (AggregateException exception) { @@ -51,7 +57,7 @@ namespace Capnp.Rpc { if (_pendingCallsOnPromise == 0) { - _resolvedCap.Task.Result.Unfreeze(); + _resolvedCap.Task.Result?.Unfreeze(); } else { @@ -79,7 +85,8 @@ namespace Capnp.Rpc if (_resolvedCap.Task.ReplacementTaskIsCompletedSuccessfully()) { - _resolvedCap.Task.Result.Export(endpoint, writer); + using var proxy = new Proxy(_resolvedCap.Task.Result); + proxy.Export(endpoint, writer); } else { @@ -147,7 +154,7 @@ namespace Capnp.Rpc } } - protected override Proxy? ResolvedCap + protected override ConsumedCapability? ResolvedCap { get { @@ -194,7 +201,7 @@ namespace Capnp.Rpc lock (_reentrancyBlocker) { - _resolvedCap.SetResult(new Proxy(resolvedCap)); + _resolvedCap.SetResult(resolvedCap); if (_pendingCallsOnPromise == 0) { @@ -218,7 +225,7 @@ namespace Capnp.Rpc #if false _resolvedCap.SetException(new RpcException(message)); #else - _resolvedCap.SetResult(new Proxy(LazyCapability.CreateBrokenCap(message))); + _resolvedCap.SetResult(LazyCapability.CreateBrokenCap(message)); #endif if (_pendingCallsOnPromise == 0) @@ -234,7 +241,7 @@ namespace Capnp.Rpc } } - protected override void ReleaseRemotely() + protected async override void ReleaseRemotely() { if (!_released) { @@ -243,7 +250,8 @@ namespace Capnp.Rpc _ep.ReleaseImport(_remoteId); - this.DisposeWhenResolved(); + try { using var _ = await _whenResolvedProxy; } + catch { } } protected override Call.WRITER SetupMessage(DynamicSerializerState args, ulong interfaceId, ushort methodId) diff --git a/Capnp.Net.Runtime/Rpc/Proxy.cs b/Capnp.Net.Runtime/Rpc/Proxy.cs index ed0a976..18ab5f4 100644 --- a/Capnp.Net.Runtime/Rpc/Proxy.cs +++ b/Capnp.Net.Runtime/Rpc/Proxy.cs @@ -10,6 +10,14 @@ namespace Capnp.Rpc /// public class Proxy : IDisposable, IResolvingCapability { + public static T Share(T obj) where T: class + { + if (obj is Proxy proxy) + return proxy.Cast(false); + else + return BareProxy.FromImpl(obj).Cast(true); + } + #if DebugFinalizers ILogger Logger { get; } = Logging.CreateLogger(); #endif @@ -19,18 +27,13 @@ namespace Capnp.Rpc /// /// Will eventually give the resolved capability, if this is a promised capability. /// - public Task WhenResolved + public Task WhenResolved { get { - if (ConsumedCap is IResolvingCapability resolving) - { - return resolving.WhenResolved; - } - else - { - return Task.FromResult(this); - } + return ConsumedCap is IResolvingCapability resolving ? + resolving.WhenResolved : + Task.FromResult(ConsumedCap); } } @@ -139,20 +142,15 @@ namespace Capnp.Rpc { if (disposing) { - ConsumedCap?.Release(); + ConsumedCap?.Release(false); } else { // When called from the Finalizer, we must not throw. // But when reference counting goes wrong, ConsumedCapability.Release() will throw an InvalidOperationException. // The only option here is to suppress that exception. - try - { - ConsumedCap?.Release(); - } - catch - { - } + try { ConsumedCap?.Release(false); } + catch { } } _disposedValue = true; diff --git a/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs b/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs index e64378c..240fc9e 100644 --- a/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs @@ -46,13 +46,8 @@ namespace Capnp.Rpc { if (disposing) { - try - { - ReleaseRemotely(); - } - catch - { - } + try { ReleaseRemotely(); } + catch { } } else { @@ -60,13 +55,8 @@ namespace Capnp.Rpc { Task.Run(() => { - try - { - ReleaseRemotely(); - } - catch - { - } + try { ReleaseRemotely(); } + catch { } }); } } @@ -76,7 +66,11 @@ namespace Capnp.Rpc { lock (_reentrancyBlocker) { - if (++_refCount <= 1) + if (_refCount == int.MinValue) + { + _refCount = 2; + } + else if (++_refCount <= 1) { --_refCount; @@ -91,6 +85,7 @@ namespace Capnp.Rpc } internal sealed override void Release( + bool keepAlive, [System.Runtime.CompilerServices.CallerMemberName] string methodName = "", [System.Runtime.CompilerServices.CallerFilePath] string filePath = "", [System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0) @@ -99,6 +94,10 @@ namespace Capnp.Rpc { switch (_refCount) { + case 2 when keepAlive: + _refCount = int.MinValue; + break; + case 1: // initial state, actually ref. count 0 case 2: // actually ref. count 1 _refCount = 0; diff --git a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs index e3269e6..328e8e3 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs @@ -16,14 +16,25 @@ namespace Capnp.Rpc readonly PendingQuestion _question; readonly MemberAccessPath _access; - Proxy? _resolvedCap; + readonly Task _whenResolvedProxy; - public RemoteAnswerCapability(PendingQuestion question, MemberAccessPath access): base(question.RpcEndpoint) + public RemoteAnswerCapability(PendingQuestion question, MemberAccessPath access, Task proxyTask) : base(question.RpcEndpoint) { _question = question ?? throw new ArgumentNullException(nameof(question)); _access = access ?? throw new ArgumentNullException(nameof(access)); + _whenResolvedProxy = proxyTask ?? throw new ArgumentNullException(nameof(proxyTask)); - _ = AwaitWhenResolved(); + async Task AwaitWhenResolved() + { + var proxy = await _whenResolvedProxy; + + if (_question.IsTailCall) + throw new InvalidOperationException("Question is a tail call, so won't resolve back."); + + return proxy.ConsumedCap; + } + + WhenResolved = AwaitWhenResolved(); } async void ReAllowFinishWhenDone(Task task) @@ -47,52 +58,32 @@ namespace Capnp.Rpc } } - protected override void Dispose(bool disposing) - { - base.Dispose(disposing); - - lock (_question.ReentrancyBlocker) - { - _resolvedCap?.Dispose(); - } - } - - protected override Proxy? ResolvedCap + protected override ConsumedCapability? ResolvedCap { get { lock (_question.ReentrancyBlocker) { - if (_resolvedCap == null && !_question.IsTailCall && _question.IsReturned) + if (!_question.IsTailCall && WhenResolved.IsCompleted) { - DeserializerState result; try { - result = _question.WhenReturned.Result; + return WhenResolved.Result; } catch (AggregateException exception) { throw exception.InnerException!; } - - _resolvedCap = new Proxy(_access.Eval(result)); } - return _resolvedCap; + else + { + return null; + } } } } - async Task AwaitWhenResolved() - { - await _question.WhenReturned; - - if (_question.IsTailCall) - throw new InvalidOperationException("Question is a tail call, so won't resolve back."); - - return ResolvedCap!; - } - - public override Task WhenResolved => AwaitWhenResolved(); + public override Task WhenResolved { get; } protected override void GetMessageTarget(MessageTarget.WRITER wr) { @@ -251,9 +242,10 @@ namespace Capnp.Rpc } } - protected override void ReleaseRemotely() + protected async override void ReleaseRemotely() { - this.DisposeWhenResolved(); + try { using var _ = await _whenResolvedProxy; } + catch { } } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapabilityDeprecated.cs b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapabilityDeprecated.cs new file mode 100644 index 0000000..51e8ee5 --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapabilityDeprecated.cs @@ -0,0 +1,265 @@ +using System; +using System.Threading.Tasks; + +namespace Capnp.Rpc +{ + + class RemoteAnswerCapabilityDeprecated : RemoteResolvingCapability + { + // Set DebugEmbargos to true to get logging output for calls. RPC calls are expected to + // be on the critical path, hence very relevant for performance. We just can't afford + // additional stuff on this path. Even if the logger filters the outputs away, there is + // overhead for creating the Logger object, calling the Logger methods and deciding to + // filter the output. This justifies the precompiler switch. +#if DebugEmbargos + ILogger Logger { get; } = Logging.CreateLogger(); +#endif + + readonly PendingQuestion _question; + readonly MemberAccessPath _access; + readonly Task _whenResolvedProxy; + ConsumedCapability? _resolvedCap; + + public RemoteAnswerCapabilityDeprecated(PendingQuestion question, MemberAccessPath access): base(question.RpcEndpoint) + { + _question = question ?? throw new ArgumentNullException(nameof(question)); + _access = access ?? throw new ArgumentNullException(nameof(access)); + + async Task AwaitWhenResolved() + { + await _question.WhenReturned; + + if (_question.IsTailCall) + throw new InvalidOperationException("Question is a tail call, so won't resolve back."); + + return ResolvedCap!; + } + + WhenResolved = AwaitWhenResolved(); + + async Task AwaitProxy() => new Proxy(await WhenResolved); + _whenResolvedProxy = AwaitProxy(); + } + + async void ReAllowFinishWhenDone(Task task) + { + try + { + ++_pendingCallsOnPromise; + + await task; + } + catch + { + } + finally + { + lock (_question.ReentrancyBlocker) + { + --_pendingCallsOnPromise; + _question.AllowFinish(); + } + } + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + lock (_question.ReentrancyBlocker) + { + using var _ = new Proxy(_resolvedCap); + } + } + + protected override ConsumedCapability? ResolvedCap + { + get + { + lock (_question.ReentrancyBlocker) + { + if (_resolvedCap == null && !_question.IsTailCall && _question.IsReturned) + { + DeserializerState result; + try + { + result = _question.WhenReturned.Result; + } + catch (AggregateException exception) + { + throw exception.InnerException!; + } + + _resolvedCap = _access.Eval(result); + } + return _resolvedCap; + } + } + } + + public override Task WhenResolved { get; } + + protected override void GetMessageTarget(MessageTarget.WRITER wr) + { + wr.which = MessageTarget.WHICH.PromisedAnswer; + wr.PromisedAnswer.QuestionId = _question.QuestionId; + _access.Serialize(wr.PromisedAnswer); + } + + internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args) + { + lock (_question.ReentrancyBlocker) + { + if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned) && + !_question.StateFlags.HasFlag(PendingQuestion.State.TailCall)) + { + if (ResolvedCap == null) + { + throw new RpcException("Answer did not resolve to expected capability"); + } + + return CallOnResolution(interfaceId, methodId, args); + } + else + { +#if DebugEmbargos + Logger.LogDebug("Call by proxy"); +#endif + if (_question.StateFlags.HasFlag(PendingQuestion.State.Disposed)) + { + throw new ObjectDisposedException(nameof(PendingQuestion)); + } + + if (_question.StateFlags.HasFlag(PendingQuestion.State.FinishRequested)) + { + throw new InvalidOperationException("Finish request was already sent"); + } + + _question.DisallowFinish(); + ++_pendingCallsOnPromise; + var promisedAnswer = base.DoCall(interfaceId, methodId, args); + ReAllowFinishWhenDone(promisedAnswer.WhenReturned); + + async void DecrementPendingCallsOnPromiseWhenReturned() + { + try + { + await promisedAnswer.WhenReturned; + } + catch + { + } + finally + { + lock (_question.ReentrancyBlocker) + { + --_pendingCallsOnPromise; + } + } + } + + DecrementPendingCallsOnPromiseWhenReturned(); + return promisedAnswer; + } + } + } + + protected override Call.WRITER SetupMessage(DynamicSerializerState args, ulong interfaceId, ushort methodId) + { + var call = base.SetupMessage(args, interfaceId, methodId); + + call.Target.which = MessageTarget.WHICH.PromisedAnswer; + call.Target.PromisedAnswer.QuestionId = _question.QuestionId; + _access.Serialize(call.Target.PromisedAnswer); + + return call; + } + + internal override void Freeze(out IRpcEndpoint? boundEndpoint) + { + lock (_question.ReentrancyBlocker) + { + if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned) && + _pendingCallsOnPromise == 0) + { + if (ResolvedCap == null) + { + throw new RpcException("Answer did not resolve to expected capability"); + } + + ResolvedCap.Freeze(out boundEndpoint); + } + else + { + ++_pendingCallsOnPromise; + _question.DisallowFinish(); + boundEndpoint = _ep; + } + } + } + + internal override void Unfreeze() + { + lock (_question.ReentrancyBlocker) + { + if (_pendingCallsOnPromise > 0) + { + --_pendingCallsOnPromise; + _question.AllowFinish(); + } + else + { + ResolvedCap?.Unfreeze(); + } + } + } + + internal override void Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) + { + lock (_question.ReentrancyBlocker) + { + if (_question.StateFlags.HasFlag(PendingQuestion.State.Disposed)) + throw new ObjectDisposedException(nameof(PendingQuestion)); + + if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned)) + { + ResolvedCap?.Export(endpoint, writer); + } + else + { + if (_question.StateFlags.HasFlag(PendingQuestion.State.FinishRequested)) + throw new InvalidOperationException("Finish request was already sent"); + + if (endpoint == _ep) + { + writer.which = CapDescriptor.WHICH.ReceiverAnswer; + _access.Serialize(writer.ReceiverAnswer); + writer.ReceiverAnswer.QuestionId = _question.QuestionId; + } + else if (_question.IsTailCall) + { + // FIXME: Resource management! We should prevent finishing this + // cap as long as it is exported. Unfortunately, we cannot determine + // when it gets removed from the export table. + + var vine = Vine.Create(this); + uint id = endpoint.AllocateExport(vine, out bool first); + + writer.which = CapDescriptor.WHICH.SenderHosted; + writer.SenderHosted = id; + } + else + { + this.ExportAsSenderPromise(endpoint, writer); + } + } + } + } + + protected async override void ReleaseRemotely() + { + try { using var _ = await _whenResolvedProxy; } + catch { } + } + } +} \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs index 225c8e1..868e775 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs @@ -16,7 +16,7 @@ namespace Capnp.Rpc ILogger Logger { get; } = Logging.CreateLogger(); #endif - public abstract Task WhenResolved { get; } + public abstract Task WhenResolved { get; } protected RemoteResolvingCapability(IRpcEndpoint ep) : base(ep) { @@ -25,7 +25,7 @@ namespace Capnp.Rpc protected int _pendingCallsOnPromise; Task? _disembargo; - protected abstract Proxy? ResolvedCap { get; } + protected abstract ConsumedCapability? ResolvedCap { get; } protected abstract void GetMessageTarget(MessageTarget.WRITER wr); @@ -46,7 +46,7 @@ namespace Capnp.Rpc throw new NotImplementedException("Sorry, level 3 RPC is not yet supported."); } - if (ResolvedCap.IsNull || + if (ResolvedCap == null || // If the capability resolves to null, disembargo must not be requested. // Take the direct path, well-knowing that the call will result in an exception. @@ -65,7 +65,8 @@ namespace Capnp.Rpc #if DebugEmbargos Logger.LogDebug("Direct call"); #endif - return ResolvedCap.Call(interfaceId, methodId, args, default); + using var proxy = new Proxy(ResolvedCap); + return proxy.Call(interfaceId, methodId, args, default); } else { @@ -93,7 +94,8 @@ namespace Capnp.Rpc cancellationTokenSource.Token.ThrowIfCancellationRequested(); - return ResolvedCap.Call(interfaceId, methodId, args, default); + using var proxy = new Proxy(ResolvedCap); + return proxy.Call(interfaceId, methodId, args, default); }, TaskContinuationOptions.ExecuteSynchronously); diff --git a/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs b/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs index 9b68f7e..36d0ae2 100644 --- a/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs +++ b/Capnp.Net.Runtime/Rpc/ResolvingCapabilityExtensions.cs @@ -1,4 +1,7 @@ -namespace Capnp.Rpc +using System; +using System.Threading.Tasks; + +namespace Capnp.Rpc { static class ResolvingCapabilityExtensions { @@ -18,8 +21,7 @@ try { var resolvedCap = await cap.WhenResolved; - - endpoint.Resolve(preliminaryId, vine, () => resolvedCap.ConsumedCap!); + endpoint.Resolve(preliminaryId, vine, () => resolvedCap!); } catch (System.Exception exception) { @@ -30,15 +32,10 @@ } } - public static async void DisposeWhenResolved(this IResolvingCapability cap) + public static async Task AsProxyTask(this Task task) { - try - { - (await cap.WhenResolved)?.Dispose(); - } - catch - { - } + var obj = await task; + return obj is Proxy proxy ? proxy : BareProxy.FromImpl(obj); } } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs index 0e66b88..0b1ad55 100644 --- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs +++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs @@ -433,6 +433,7 @@ namespace Capnp.Rpc ret.Results.Content = results.Rewrap(); ret.ReleaseParamCaps = releaseParamCaps; ExportCapTableAndSend(results, ret.Results); + pendingAnswer.CapTable = ret.Results.CapTable; break; case Call.sendResultsTo.WHICH.Yourself: @@ -592,11 +593,9 @@ namespace Capnp.Rpc { try { - using (var proxy = await t) - { - cap = proxy?.GetProvider(); - CreateAnswerAwaitItAndReply(); - } + using var proxy = await t; + cap = proxy?.GetProvider(); + CreateAnswerAwaitItAndReply(); } catch (TaskCanceledException) { @@ -825,31 +824,29 @@ namespace Capnp.Rpc { try { - using (var proxy = await t) - { - proxy.Freeze(out var boundEndpoint); + using var proxy = await t; + proxy.Freeze(out var boundEndpoint); - try + try + { + if (boundEndpoint == this) { - if (boundEndpoint == this) - { #if DebugEmbargos Logger.LogDebug($"Sender loopback disembargo. Thread = {Thread.CurrentThread.Name}"); #endif - Tx(mb.Frame); - } - else - { - Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender."); - - throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me"); - } + Tx(mb.Frame); } - finally + else { - proxy.Unfreeze(); + Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender."); + + throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me"); } } + finally + { + proxy.Unfreeze(); + } } catch (System.Exception exception) { @@ -933,15 +930,32 @@ namespace Capnp.Rpc try { var aorcq = await t; - var results = aorcq.Answer; + var caps = answer.CapTable; - if (results != null && results.Caps != null) + if (caps != null) { - foreach (var cap in results.Caps) + foreach (var capDesc in caps) { - cap?.Release(); + switch (capDesc.which) + { + case CapDescriptor.WHICH.SenderHosted: + ReleaseExport(capDesc.SenderHosted, 1); + break; + + case CapDescriptor.WHICH.SenderPromise: + ReleaseExport(capDesc.SenderPromise, 1); + break; + } } } + + //if (results != null && results.Caps != null) + //{ + // foreach (var cap in results.Caps) + // { + // cap?.Release(); + // } + //} } catch { @@ -1091,7 +1105,7 @@ namespace Capnp.Rpc Tx(mb.Frame); - var main = new RemoteAnswerCapability( + var main = new RemoteAnswerCapabilityDeprecated( pendingBootstrap, MemberAccessPath.BootstrapAccess); @@ -1360,7 +1374,7 @@ namespace Capnp.Rpc else { cap.Export(this, capDesc); - cap.Release(); + cap.Release(false); } } diff --git a/Capnp.Net.Runtime/Rpc/Vine.cs b/Capnp.Net.Runtime/Rpc/Vine.cs index 52f10c1..db63d0c 100644 --- a/Capnp.Net.Runtime/Rpc/Vine.cs +++ b/Capnp.Net.Runtime/Rpc/Vine.cs @@ -38,10 +38,8 @@ namespace Capnp.Rpc { try { - using (var registration = cancellationToken.Register(promisedAnswer.Dispose)) - { - await promisedAnswer.WhenReturned; - } + using var registration = cancellationToken.Register(promisedAnswer.Dispose); + await promisedAnswer.WhenReturned; } catch { @@ -54,10 +52,8 @@ namespace Capnp.Rpc } else { - using (var registration = cancellationToken.Register(promisedAnswer.Dispose)) - { - return (DynamicSerializerState)await promisedAnswer.WhenReturned; - } + using var registration = cancellationToken.Register(promisedAnswer.Dispose); + return (DynamicSerializerState)await promisedAnswer.WhenReturned; } } diff --git a/Capnp.Net.Runtime/SerializerState.cs b/Capnp.Net.Runtime/SerializerState.cs index f52cc18..7bdb9e2 100644 --- a/Capnp.Net.Runtime/SerializerState.cs +++ b/Capnp.Net.Runtime/SerializerState.cs @@ -1259,7 +1259,8 @@ namespace Capnp /// /// The capability, in one of the following forms: /// Low-level capability object (Rpc.ConsumedCapability) - /// Proxy object (Rpc.Proxy) + /// Proxy object (Rpc.Proxy). Note that the provision has "move semantics": SerializerState + /// takes ownership, so the Proxy object will be disposed. /// Skeleton object (Rpc.Skeleton) /// Capability interface implementation /// @@ -1267,16 +1268,19 @@ namespace Capnp /// The underlying message builder was not configured for capability table support. public uint ProvideCapability(object? obj) { - if (obj == null) - return ProvideCapability(default(Rpc.ConsumedCapability)); - else if (obj is Rpc.Proxy proxy) - return ProvideCapability(proxy.ConsumedCap); - else if (obj is Rpc.ConsumedCapability consumedCapability) - return ProvideCapability(consumedCapability); - else if (obj is Rpc.Skeleton providedCapability) - return ProvideCapability(providedCapability); - else - return ProvideCapability(Rpc.Skeleton.GetOrCreateSkeleton(obj, false)); + switch (obj) + { + case null: + return ProvideCapability(default(Rpc.ConsumedCapability)); + case Rpc.Proxy proxy: using (proxy) + return ProvideCapability(proxy.ConsumedCap); + case Rpc.ConsumedCapability consumedCapability: + return ProvideCapability(consumedCapability); + case Rpc.Skeleton providedCapability: + return ProvideCapability(providedCapability); + default: + return ProvideCapability(Rpc.Skeleton.GetOrCreateSkeleton(obj, false)); + } } ///