From e3b6fd5c62d98cc39bd3669870ec58c0bc4e2cc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Mon, 6 Apr 2020 21:12:08 +0200 Subject: [PATCH] factoring out Freeze/Unfreeze --- Capnp.Net.Runtime/Rpc/ConsumedCapability.cs | 3 +- Capnp.Net.Runtime/Rpc/ImportedCapability.cs | 9 +- .../Rpc/Interception/CensorCapability.cs | 9 +- Capnp.Net.Runtime/Rpc/LazyCapability.cs | 25 +--- .../Rpc/LocalAnswerCapability.cs | 10 +- Capnp.Net.Runtime/Rpc/LocalCapability.cs | 9 +- Capnp.Net.Runtime/Rpc/PromisedCapability.cs | 56 +------- Capnp.Net.Runtime/Rpc/Proxy.cs | 17 --- .../Rpc/RemoteAnswerCapability.cs | 40 +----- .../Rpc/RemoteResolvingCapability.cs | 131 ++++++++---------- Capnp.Net.Runtime/Rpc/RpcEngine.cs | 20 +-- 11 files changed, 73 insertions(+), 256 deletions(-) diff --git a/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs b/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs index 3faa492..675fa97 100644 --- a/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/ConsumedCapability.cs @@ -16,8 +16,7 @@ namespace Capnp.Rpc /// protected abstract void ReleaseRemotely(); internal abstract Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer); - internal abstract void Freeze(out IRpcEndpoint? boundEndpoint); - internal abstract void Unfreeze(); + internal abstract IRpcEndpoint? Endpoint { get; } internal abstract void AddRef(); internal abstract void Release(); diff --git a/Capnp.Net.Runtime/Rpc/ImportedCapability.cs b/Capnp.Net.Runtime/Rpc/ImportedCapability.cs index 5564351..ca9d75a 100644 --- a/Capnp.Net.Runtime/Rpc/ImportedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/ImportedCapability.cs @@ -28,14 +28,7 @@ namespace Capnp.Rpc return call; } - internal override void Freeze(out IRpcEndpoint boundEndpoint) - { - boundEndpoint = _ep; - } - - internal override void Unfreeze() - { - } + internal override IRpcEndpoint? Endpoint => _ep; internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER capDesc) { diff --git a/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs b/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs index 0889c87..d6f1fbd 100644 --- a/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs +++ b/Capnp.Net.Runtime/Rpc/Interception/CensorCapability.cs @@ -35,13 +35,6 @@ namespace Capnp.Rpc.Interception return null; } - internal override void Freeze(out IRpcEndpoint? boundEndpoint) - { - boundEndpoint = null; - } - - internal override void Unfreeze() - { - } + internal override IRpcEndpoint? Endpoint => null; } } \ No newline at end of file diff --git a/Capnp.Net.Runtime/Rpc/LazyCapability.cs b/Capnp.Net.Runtime/Rpc/LazyCapability.cs index 3e11953..1289280 100644 --- a/Capnp.Net.Runtime/Rpc/LazyCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LazyCapability.cs @@ -33,30 +33,7 @@ namespace Capnp.Rpc _capTask = AwaitCap(); } - internal override void Freeze(out IRpcEndpoint? boundEndpoint) - { - if (WhenResolved.IsCompleted) - { - boundEndpoint = null; - - try - { - _capTask.Result?.Freeze(out boundEndpoint); - } - catch (AggregateException exception) - { - throw exception.InnerException!; - } - } - else - { - boundEndpoint = null; - } - } - - internal override void Unfreeze() - { - } + internal override IRpcEndpoint? Endpoint => null; internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) { diff --git a/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs b/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs index e29a1b3..bfd9208 100644 --- a/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LocalAnswerCapability.cs @@ -29,15 +29,7 @@ namespace Capnp.Rpc } - internal override void Freeze(out IRpcEndpoint? boundEndpoint) - { - boundEndpoint = null; - } - - internal override void Unfreeze() - { - } - + internal override IRpcEndpoint? Endpoint => null; public Task WhenResolved => _whenResolvedProxy; diff --git a/Capnp.Net.Runtime/Rpc/LocalCapability.cs b/Capnp.Net.Runtime/Rpc/LocalCapability.cs index 3e1954a..c3ab966 100644 --- a/Capnp.Net.Runtime/Rpc/LocalCapability.cs +++ b/Capnp.Net.Runtime/Rpc/LocalCapability.cs @@ -52,14 +52,7 @@ namespace Capnp.Rpc return null; } - internal override void Freeze(out IRpcEndpoint? boundEndpoint) - { - boundEndpoint = null; - } - - internal override void Unfreeze() - { - } + internal override IRpcEndpoint? Endpoint => null; protected override void ReleaseRemotely() { diff --git a/Capnp.Net.Runtime/Rpc/PromisedCapability.cs b/Capnp.Net.Runtime/Rpc/PromisedCapability.cs index 0b5bfbf..779f5ab 100644 --- a/Capnp.Net.Runtime/Rpc/PromisedCapability.cs +++ b/Capnp.Net.Runtime/Rpc/PromisedCapability.cs @@ -23,61 +23,7 @@ namespace Capnp.Rpc public override Task WhenResolved => _resolvedCap.Task; public override T? GetResolvedCapability() where T: class => _whenResolvedProxy.GetResolvedCapability(); - internal override void Freeze(out IRpcEndpoint? boundEndpoint) - { - lock (_reentrancyBlocker) - { - if (_resolvedCap.Task.IsCompleted && _pendingCallsOnPromise == 0) - { - boundEndpoint = null; - - try - { - _resolvedCap.Task.Result?.Freeze(out boundEndpoint); - } - catch (AggregateException exception) - { - throw exception.InnerException!; - } - } - else - { - Debug.Assert(!_released); - ++_pendingCallsOnPromise; - - boundEndpoint = _ep; - } - } - } - - internal override void Unfreeze() - { - bool release = false; - - lock (_reentrancyBlocker) - { - if (_pendingCallsOnPromise == 0) - { - _resolvedCap.Task.Result?.Unfreeze(); - } - else - { - Debug.Assert(_pendingCallsOnPromise > 0); - Debug.Assert(!_released); - - if (--_pendingCallsOnPromise == 0 && _resolvedCap.Task.IsCompleted) - { - release = true; - _released = true; - } - } - } - - if (release) - { - _ep.ReleaseImport(_remoteId); - } - } + internal override IRpcEndpoint? Endpoint => _ep; internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) { diff --git a/Capnp.Net.Runtime/Rpc/Proxy.cs b/Capnp.Net.Runtime/Rpc/Proxy.cs index 34f09c1..419826e 100644 --- a/Capnp.Net.Runtime/Rpc/Proxy.cs +++ b/Capnp.Net.Runtime/Rpc/Proxy.cs @@ -238,23 +238,6 @@ namespace Capnp.Rpc } } - internal void Freeze(out IRpcEndpoint? boundEndpoint) - { - if (_disposedValue) - throw new ObjectDisposedException(nameof(Proxy)); - - boundEndpoint = null; - ConsumedCap?.Freeze(out boundEndpoint); - } - - internal void Unfreeze() - { - if (_disposedValue) - throw new ObjectDisposedException(nameof(Proxy)); - - ConsumedCap?.Unfreeze(); - } - #if DebugFinalizers string CreatorStackTrace { get; set; } #endif diff --git a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs index 38aaa6b..214a077 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs @@ -174,45 +174,7 @@ namespace Capnp.Rpc return call; } - internal override void Freeze(out IRpcEndpoint? boundEndpoint) - { - lock (_question.ReentrancyBlocker) - { - if ( _question.StateFlags.HasFlag(PendingQuestion.State.Returned) && - !_question.StateFlags.HasFlag(PendingQuestion.State.TailCall) && - _pendingCallsOnPromise == 0) - { - if (ResolvedCap == null) - { - throw new RpcException("Answer did not resolve to expected capability"); - } - - ResolvedCap.Freeze(out boundEndpoint); - } - else - { - ++_pendingCallsOnPromise; - _question.DisallowFinish(); - boundEndpoint = _ep; - } - } - } - - internal override void Unfreeze() - { - lock (_question.ReentrancyBlocker) - { - if (_pendingCallsOnPromise > 0) - { - --_pendingCallsOnPromise; - _question.AllowFinish(); - } - else - { - ResolvedCap?.Unfreeze(); - } - } - } + internal override IRpcEndpoint Endpoint => _ep; internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) { diff --git a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs index 6f3ba0b..c8f4108 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs @@ -37,91 +37,78 @@ namespace Capnp.Rpc try { - ResolvedCap.Freeze(out var resolvedCapEndpoint); - - try + if (ResolvedCap.Endpoint!= null && ResolvedCap.Endpoint != _ep) { - if (resolvedCapEndpoint != null && resolvedCapEndpoint != _ep) - { - // Carol lives in a different Vat C. - throw new NotImplementedException("Sorry, level 3 RPC is not yet supported."); - } + // Carol lives in a different Vat C. + throw new NotImplementedException("Sorry, level 3 RPC is not yet supported."); + } - if (ResolvedCap == null || - // If the capability resolves to null, disembargo must not be requested. - // Take the direct path, well-knowing that the call will result in an exception. + if (ResolvedCap.Endpoint != null || + //# Note that in the case where Carol actually lives in Vat B (i.e., the same vat that the promise + //# already pointed at), no embargo is needed, because the pipelined calls are delivered over the + //# same path as the later direct calls. - resolvedCapEndpoint != null || - //# Note that in the case where Carol actually lives in Vat B (i.e., the same vat that the promise - //# already pointed at), no embargo is needed, because the pipelined calls are delivered over the - //# same path as the later direct calls. + (_disembargo == null && _pendingCallsOnPromise == 0) || + // No embargo is needed since all outstanding replies have returned - (_disembargo == null && _pendingCallsOnPromise == 0) || - // No embargo is needed since all outstanding replies have returned - - _disembargo?.IsCompleted == true - // Disembargo has returned - ) + _disembargo?.IsCompleted == true + // Disembargo has returned + ) + { +#if DebugEmbargos + Logger.LogDebug("Direct call"); +#endif + using var proxy = new Proxy(ResolvedCap); + return proxy.Call(interfaceId, methodId, args, default); + } + else + { + if (_disembargo == null) { #if DebugEmbargos - Logger.LogDebug("Direct call"); + Logger.LogDebug("Requesting disembargo"); #endif - using var proxy = new Proxy(ResolvedCap); - return proxy.Call(interfaceId, methodId, args, default); + _disembargo = _ep.RequestSenderLoopback(GetMessageTarget); } else { - if (_disembargo == null) - { #if DebugEmbargos - Logger.LogDebug("Requesting disembargo"); + Logger.LogDebug("Waiting for requested disembargo"); #endif - _disembargo = _ep.RequestSenderLoopback(GetMessageTarget); - } - else - { -#if DebugEmbargos - Logger.LogDebug("Waiting for requested disembargo"); -#endif - } - - var cancellationTokenSource = new CancellationTokenSource(); - - var callAfterDisembargo = _disembargo.ContinueWith(_ => - { - // Two reasons for ignoring exceptions on the previous task (i.e. not _.Wait()ing): - // 1. A faulting predecessor, especially due to cancellation, must not have any impact on this one. - // 2. A faulting disembargo request would imply that the other side cannot send pending requests anyway. - - if (cancellationTokenSource.Token.IsCancellationRequested) - { - args.Dispose(); - cancellationTokenSource.Token.ThrowIfCancellationRequested(); - } - - using var proxy = new Proxy(ResolvedCap); - return proxy.Call(interfaceId, methodId, args, default); - - }, TaskContinuationOptions.ExecuteSynchronously); - - _disembargo = callAfterDisembargo; - - async Task AwaitAnswer() - { - var promisedAnswer = await callAfterDisembargo; - - using (cancellationTokenSource.Token.Register(promisedAnswer.Dispose)) - { - return await promisedAnswer.WhenReturned; - } - } - - return new LocalAnswer(cancellationTokenSource, AwaitAnswer()); } - } - finally - { - ResolvedCap.Unfreeze(); + + var cancellationTokenSource = new CancellationTokenSource(); + + var callAfterDisembargo = _disembargo.ContinueWith(_ => + { + // Two reasons for ignoring exceptions on the previous task (i.e. not _.Wait()ing): + // 1. A faulting predecessor, especially due to cancellation, must not have any impact on this one. + // 2. A faulting disembargo request would imply that the other side cannot send pending requests anyway. + + if (cancellationTokenSource.Token.IsCancellationRequested) + { + args.Dispose(); + cancellationTokenSource.Token.ThrowIfCancellationRequested(); + } + + using var proxy = new Proxy(ResolvedCap); + return proxy.Call(interfaceId, methodId, args, default); + + }, TaskContinuationOptions.ExecuteSynchronously); + + _disembargo = callAfterDisembargo; + + async Task AwaitAnswer() + { + var promisedAnswer = await callAfterDisembargo; + + using (cancellationTokenSource.Token.Register(promisedAnswer.Dispose)) + { + return await promisedAnswer.WhenReturned; + } + } + + return new LocalAnswer(cancellationTokenSource, AwaitAnswer()); } } catch (System.Exception exception) diff --git a/Capnp.Net.Runtime/Rpc/RpcEngine.cs b/Capnp.Net.Runtime/Rpc/RpcEngine.cs index 021c8ec..ed13068 100644 --- a/Capnp.Net.Runtime/Rpc/RpcEngine.cs +++ b/Capnp.Net.Runtime/Rpc/RpcEngine.cs @@ -890,27 +890,19 @@ namespace Capnp.Rpc try { using var proxy = await t; - proxy.Freeze(out var boundEndpoint); - try + if (proxy.ConsumedCap?.Endpoint == this) { - if (boundEndpoint == this) - { #if DebugEmbargos Logger.LogDebug($"Sender loopback disembargo. Thread = {Thread.CurrentThread.Name}"); #endif - Tx(mb.Frame); - } - else - { - Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender."); - - throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me"); - } + Tx(mb.Frame); } - finally + else { - proxy.Unfreeze(); + Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender."); + + throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me"); } } catch (System.Exception exception)