This commit is contained in:
Christian Köllner 2020-03-10 21:55:34 +01:00
parent 281a1c868f
commit f58464293b
27 changed files with 661 additions and 228 deletions

View File

@ -563,7 +563,7 @@ namespace Capnp.Net.Runtime.Tests
_.Call.QuestionId = 42;
_.Call.Target.which = MessageTarget.WHICH.ImportedCap;
_.Call.Target.ImportedCap = bootCapId;
_.Call.InterfaceId = ((TypeIdAttribute)typeof(ITestPipeline).GetCustomAttributes(typeof(TypeIdAttribute), false)[0]).Id;
_.Call.InterfaceId = new TestPipeline_Skeleton().InterfaceId;
_.Call.MethodId = 0;
var wr = _.Call.Params.Content.Rewrap<TestPipeline.Params_getCap.WRITER>();
wr.InCap = null;
@ -588,7 +588,7 @@ namespace Capnp.Net.Runtime.Tests
{
_1.which = Message.WHICH.Unimplemented;
_1.Unimplemented.which = Message.WHICH.Resolve;
Reserializing.DeepCopy(_, _1.Unimplemented.Resolve);
Reserializing.DeepCopy(_.Resolve, _1.Unimplemented.Resolve);
});
Assert.IsFalse(impl.IsGrandsonCapDisposed);

View File

@ -17255,7 +17255,11 @@ namespace Capnproto_test.Capnp.Test
static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap = new MemberAccessPath(1U, 0U);
public static Capnproto_test.Capnp.Test.ITestInterface OutBox_Cap(this Task<(string, Capnproto_test.Capnp.Test.TestPipeline.Box)> task)
{
return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestInterface>(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap));
async Task<IDisposable> AwaitProxy() => (await task).Item2.Cap;
return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestInterface>(Impatient.GetAnswer(task).Access(
Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap,
AwaitProxy()));
}
static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap = new MemberAccessPath(1U, 0U);

View File

@ -106,6 +106,11 @@ namespace Capnp
case ObjectKind.ListOfStructs:
case ObjectKind.Nil:
case ObjectKind.Struct:
if (state.Caps != null)
{
foreach (var cap in state.Caps)
cap?.Release(true);
}
return new DeserializerState(state.Allocator!.Segments)
{
CurrentSegmentIndex = state.SegmentIndex,

View File

@ -42,7 +42,11 @@ namespace Capnp
{
var mb = MessageBuilder.Create();
if (state.Caps != null)
{
mb.InitCapTable();
foreach (var cap in state.Caps)
cap?.AddRef();
}
var sstate = mb.CreateObject<DynamicSerializerState>();
Reserializing.DeepCopy(state, sstate);

View File

@ -92,19 +92,19 @@ namespace Capnp
switch (items)
{
case T[] array:
array.CopyTo(Span);
array.CopyTo(Data);
break;
case ArraySegment<T> segment:
segment.AsSpan().CopyTo(Span);
segment.AsSpan().CopyTo(Data);
break;
case ListOfPrimitivesDeserializer<T> deser:
deser.Span.CopyTo(Span);
deser.Span.CopyTo(Data);
break;
case ListOfPrimitivesSerializer<T> ser:
ser.Span.CopyTo(Span);
ser.Data.CopyTo(Data);
break;
default:
@ -116,12 +116,18 @@ namespace Capnp
}
}
IEnumerable<T> Enumerate()
{
for (int i = 0; i < Data.Length; i++)
yield return Data[i];
}
/// <summary>
/// Implements <see cref="IEnumerable{T}"/>.
/// </summary>
/// <returns></returns>
public IEnumerator<T> GetEnumerator() => Enumerate().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => Span.ToArray().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}

View File

@ -19,6 +19,7 @@
internal abstract void AddRef();
internal abstract void Release(
bool keepAlive,
[System.Runtime.CompilerServices.CallerMemberName] string methodName = "",
[System.Runtime.CompilerServices.CallerFilePath] string filePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0);

View File

@ -23,5 +23,7 @@ namespace Capnp.Rpc
/// <param name="access">Path to the desired capability inside the result struct.</param>
/// <returns>Pipelined low-level capability</returns>
ConsumedCapability? Access(MemberAccessPath access);
ConsumedCapability? Access(MemberAccessPath access, Task<IDisposable> proxyTask);
}
}

View File

@ -10,6 +10,6 @@ namespace Capnp.Rpc
/// <summary>
/// Will eventually give the resolved capability.
/// </summary>
Task<Proxy> WhenResolved { get; }
Task<ConsumedCapability?> WhenResolved { get; }
}
}

View File

@ -25,10 +25,10 @@ namespace Capnp.Rpc.Interception
public Task<DeserializerState> WhenReturned => _futureResult.Task;
public CancellationToken CancelFromAlice => _cancelFromAlice.Token;
async Task<Proxy> AccessWhenReturned(MemberAccessPath access)
async Task<ConsumedCapability?> AccessWhenReturned(MemberAccessPath access)
{
await WhenReturned;
return new Proxy(Access(access));
return Access(access);
}
public ConsumedCapability? Access(MemberAccessPath access)
@ -50,6 +50,27 @@ namespace Capnp.Rpc.Interception
}
}
public ConsumedCapability? Access(MemberAccessPath _, Task<IDisposable> task)
{
var proxyTask = task.AsProxyTask();
if (proxyTask.IsCompleted)
{
try
{
return proxyTask.Result?.ConsumedCap;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
}
else
{
return new LazyCapability(proxyTask);
}
}
public void Dispose()
{
try

View File

@ -16,7 +16,7 @@
protected override void ReleaseRemotely()
{
InterceptedCapability.Release();
InterceptedCapability.Release(false);
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)

View File

@ -8,32 +8,45 @@ namespace Capnp.Rpc
{
public static LazyCapability CreateBrokenCap(string message)
{
var cap = new LazyCapability(Task.FromException<Proxy>(new RpcException(message)));
var cap = new LazyCapability(Task.FromException<ConsumedCapability?>(new RpcException(message)));
cap.AddRef(); // Instance shall be persistent
return cap;
}
public static LazyCapability CreateCanceledCap(CancellationToken token)
{
var cap = new LazyCapability(Task.FromCanceled<Proxy>(token));
var cap = new LazyCapability(Task.FromCanceled<ConsumedCapability?>(token));
cap.AddRef(); // Instance shall be persistent
return cap;
}
public static LazyCapability Null { get; } = CreateBrokenCap("Null capability");
public LazyCapability(Task<Proxy> capabilityTask)
readonly Task<Proxy>? _proxyTask;
public LazyCapability(Task<ConsumedCapability?> capabilityTask)
{
WhenResolved = capabilityTask;
}
public LazyCapability(Task<Proxy> proxyTask)
{
_proxyTask = proxyTask;
async Task<ConsumedCapability?> AwaitCap() => (await _proxyTask!).ConsumedCap;
WhenResolved = AwaitCap();
}
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
{
if (WhenResolved.IsCompleted)
{
boundEndpoint = null;
try
{
WhenResolved.Result.Freeze(out boundEndpoint);
WhenResolved.Result?.Freeze(out boundEndpoint);
}
catch (AggregateException exception)
{
@ -54,7 +67,8 @@ namespace Capnp.Rpc
{
if (WhenResolved.ReplacementTaskIsCompletedSuccessfully())
{
WhenResolved.Result.Export(endpoint, writer);
using var proxy = new Proxy(WhenResolved.Result);
proxy.Export(endpoint, writer);
}
else
{
@ -62,24 +76,21 @@ namespace Capnp.Rpc
}
}
async void DisposeProxyWhenResolved()
{
try
{
var cap = await WhenResolved;
if (cap != null) cap.Dispose();
}
catch
{
}
}
protected override void ReleaseRemotely()
{
DisposeProxyWhenResolved();
if (_proxyTask != null)
{
async void DisposeProxyWhenResolved()
{
try { using var _ = await _proxyTask!; }
catch { }
}
DisposeProxyWhenResolved();
}
}
public Task<Proxy> WhenResolved { get; }
public Task<ConsumedCapability?> WhenResolved { get; }
async Task<DeserializerState> CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken)
{
@ -90,7 +101,8 @@ namespace Capnp.Rpc
if (cap == null)
throw new RpcException("Broken capability");
var call = cap.Call(interfaceId, methodId, args, default);
using var proxy = new Proxy(cap);
var call = proxy.Call(interfaceId, methodId, args, default);
var whenReturned = call.WhenReturned;
using (var registration = cancellationToken.Register(call.Dispose))

View File

@ -18,35 +18,27 @@ namespace Capnp.Rpc
async void CleanupAfterReturn()
{
try
{
await WhenReturned;
}
catch
{
}
finally
{
_cts.Dispose();
}
try { await WhenReturned; }
catch { }
finally { _cts.Dispose(); }
}
public Task<DeserializerState> WhenReturned { get; }
public ConsumedCapability Access(MemberAccessPath access)
{
return new LocalAnswerCapability(WhenReturned, access);
return new LocalAnswerCapabilityDeprecated(WhenReturned, access);
}
public ConsumedCapability Access(MemberAccessPath _, Task<IDisposable> task)
{
return new LocalAnswerCapability(task.AsProxyTask());
}
public void Dispose()
{
try
{
_cts.Cancel();
}
catch (ObjectDisposedException)
{
}
try { _cts.Cancel(); }
catch (ObjectDisposedException) { }
}
}
}

View File

@ -4,15 +4,17 @@ using System.Threading.Tasks;
namespace Capnp.Rpc
{
class LocalAnswerCapability : RefCountingCapability, IResolvingCapability
{
readonly Task<DeserializerState> _answer;
readonly MemberAccessPath _access;
readonly Task<Proxy> _whenResolvedProxy;
public LocalAnswerCapability(Task<DeserializerState> answer, MemberAccessPath access)
public LocalAnswerCapability(Task<Proxy> proxyTask)
{
_answer = answer;
_access = access;
_whenResolvedProxy = proxyTask;
async Task<ConsumedCapability?> AwaitResolved() => (await _whenResolvedProxy).ConsumedCap;
WhenResolved = AwaitResolved();
}
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
@ -24,31 +26,20 @@ namespace Capnp.Rpc
{
}
async Task<Proxy> AwaitResolved()
{
var state = await _answer;
return new Proxy(_access.Eval(state));
}
public Task<Proxy> WhenResolved => AwaitResolved();
public Task<ConsumedCapability?> WhenResolved { get; private set; }
internal override void Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{
if (_answer.IsCompleted)
if (_whenResolvedProxy.IsCompleted)
{
DeserializerState result;
try
{
result = _answer.Result;
_whenResolvedProxy.Result.Export(endpoint, writer);
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
using (var proxy = new Proxy(_access.Eval(result)))
{
proxy.Export(endpoint, writer);
throw exception.InnerException;
}
}
else
@ -59,20 +50,18 @@ namespace Capnp.Rpc
async Task<DeserializerState> CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken)
{
var cap = await AwaitResolved();
var proxy = await _whenResolvedProxy;
cancellationToken.ThrowIfCancellationRequested();
if (cap == null)
if (proxy.IsNull)
throw new RpcException("Broken capability");
var call = cap.Call(interfaceId, methodId, args, default);
var call = proxy.Call(interfaceId, methodId, args, default);
var whenReturned = call.WhenReturned;
using (var registration = cancellationToken.Register(() => call.Dispose()))
{
return await whenReturned;
}
using var registration = cancellationToken.Register(() => call.Dispose());
return await whenReturned;
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)
@ -81,9 +70,10 @@ namespace Capnp.Rpc
return new LocalAnswer(cts, CallImpl(interfaceId, methodId, args, cts.Token));
}
protected override void ReleaseRemotely()
protected async override void ReleaseRemotely()
{
this.DisposeWhenResolved();
try { using var _ = await _whenResolvedProxy; }
catch { }
}
}
}

View File

@ -0,0 +1,83 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
class LocalAnswerCapabilityDeprecated : RefCountingCapability, IResolvingCapability
{
readonly Task<DeserializerState> _answer;
readonly MemberAccessPath _access;
public LocalAnswerCapabilityDeprecated(Task<DeserializerState> answer, MemberAccessPath access)
{
_answer = answer;
_access = access;
async Task<ConsumedCapability?> AwaitResolved() => access.Eval(await _answer);
WhenResolved = AwaitResolved();
}
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
{
boundEndpoint = null;
}
internal override void Unfreeze()
{
}
public Task<ConsumedCapability?> WhenResolved { get; private set; }
internal override void Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{
if (_answer.IsCompleted)
{
DeserializerState result;
try
{
result = _answer.Result;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
using var proxy = new Proxy(_access.Eval(result));
proxy.Export(endpoint, writer);
}
else
{
this.ExportAsSenderPromise(endpoint, writer);
}
}
async Task<DeserializerState> CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken)
{
var cap = await WhenResolved;
cancellationToken.ThrowIfCancellationRequested();
if (cap == null)
throw new RpcException("Broken capability");
using var proxy = new Proxy(cap);
var call = proxy.Call(interfaceId, methodId, args, default);
var whenReturned = call.WhenReturned;
using var registration = cancellationToken.Register(() => call.Dispose());
return await whenReturned;
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)
{
var cts = new CancellationTokenSource();
return new LocalAnswer(cts, CallImpl(interfaceId, methodId, args, cts.Token));
}
protected override void ReleaseRemotely()
{
}
}
}

View File

@ -7,15 +7,12 @@ namespace Capnp.Rpc
{
class LocalCapability : ConsumedCapability
{
static readonly ConditionalWeakTable<Skeleton, LocalCapability> _localCaps =
new ConditionalWeakTable<Skeleton, LocalCapability>();
public static ConsumedCapability Create(Skeleton skeleton)
{
if (skeleton is Vine vine)
return vine.Proxy.ConsumedCap!;
else
return _localCaps.GetValue(skeleton, _ => new LocalCapability(_));
return new LocalCapability(skeleton);
}
static async Task<DeserializerState> AwaitAnswer(Task<AnswerOrCounterquestion> call)
@ -25,6 +22,7 @@ namespace Capnp.Rpc
}
public Skeleton ProvidedCap { get; }
int _releaseFlag;
LocalCapability(Skeleton providedCap)
{
@ -33,15 +31,20 @@ namespace Capnp.Rpc
internal override void AddRef()
{
ProvidedCap.Claim();
if (0 == Interlocked.CompareExchange(ref _releaseFlag, 0, 1))
ProvidedCap.Claim();
}
internal override void Release(
bool keepAlive,
[System.Runtime.CompilerServices.CallerMemberName] string methodName = "",
[System.Runtime.CompilerServices.CallerFilePath] string filePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0)
{
ProvidedCap.Relinquish();
if (keepAlive)
Interlocked.Exchange(ref _releaseFlag, 1);
else
ProvidedCap.Relinquish();
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@ -27,6 +28,8 @@ namespace Capnp.Rpc
public CancellationToken CancellationToken => _cts?.Token ?? CancellationToken.None;
public IReadOnlyList<CapDescriptor.WRITER> CapTable { get; set; }
public void Cancel()
{
_cts?.Cancel();
@ -96,7 +99,7 @@ namespace Capnp.Rpc
else
{
var path = MemberAccessPath.Deserialize(rd);
var cap = new RemoteAnswerCapability(aorcq.Counterquestion!, path);
var cap = new RemoteAnswerCapabilityDeprecated(aorcq.Counterquestion!, path);
return new Proxy(cap);
}
}

View File

@ -252,7 +252,39 @@ namespace Capnp.Rpc
}
else
{
return new RemoteAnswerCapability(this, access);
return new RemoteAnswerCapabilityDeprecated(this, access);
}
}
}
/// <summary>
/// Refer to a (possibly nested) member of this question's (possibly future) result and return
/// it as a capability.
/// </summary>
/// <param name="task">promises the cap whose ownership is transferred to this object</param>
/// <returns>Low-level capability</returns>
/// <exception cref="DeserializationException">The referenced member does not exist or does not resolve to a capability pointer.</exception>
public ConsumedCapability? Access(MemberAccessPath access, Task<IDisposable> task)
{
var proxyTask = task.AsProxyTask();
lock (ReentrancyBlocker)
{
if (proxyTask.IsCompleted && !StateFlags.HasFlag(State.TailCall))
{
try
{
using var proxy = proxyTask.Result;
return proxy.ConsumedCap;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
}
else
{
return new RemoteAnswerCapabilityDeprecated(this, access);
}
}
}
@ -263,13 +295,13 @@ namespace Capnp.Rpc
{
foreach (var cap in inParams.Caps!)
{
cap?.Release();
cap?.Release(false);
}
}
if (target != null)
{
target.Release();
target.Release(false);
}
}
@ -277,7 +309,7 @@ namespace Capnp.Rpc
{
foreach (var cap in outParams.Caps!)
{
cap?.Release();
cap?.Release(false);
}
}

View File

@ -8,15 +8,19 @@ namespace Capnp.Rpc
{
readonly uint _remoteId;
readonly object _reentrancyBlocker = new object();
readonly TaskCompletionSource<Proxy> _resolvedCap = new TaskCompletionSource<Proxy>();
readonly TaskCompletionSource<ConsumedCapability?> _resolvedCap = new TaskCompletionSource<ConsumedCapability?>();
readonly Task<Proxy> _whenResolvedProxy;
bool _released;
public PromisedCapability(IRpcEndpoint ep, uint remoteId): base(ep)
{
_remoteId = remoteId;
async Task<Proxy> AwaitProxy() => new Proxy(await WhenResolved);
_whenResolvedProxy = AwaitProxy();
}
public override Task<Proxy> WhenResolved => _resolvedCap.Task;
public override Task<ConsumedCapability?> WhenResolved => _resolvedCap.Task;
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
{
@ -24,9 +28,11 @@ namespace Capnp.Rpc
{
if (_resolvedCap.Task.IsCompleted && _pendingCallsOnPromise == 0)
{
boundEndpoint = null;
try
{
_resolvedCap.Task.Result.Freeze(out boundEndpoint);
_resolvedCap.Task.Result?.Freeze(out boundEndpoint);
}
catch (AggregateException exception)
{
@ -51,7 +57,7 @@ namespace Capnp.Rpc
{
if (_pendingCallsOnPromise == 0)
{
_resolvedCap.Task.Result.Unfreeze();
_resolvedCap.Task.Result?.Unfreeze();
}
else
{
@ -79,7 +85,8 @@ namespace Capnp.Rpc
if (_resolvedCap.Task.ReplacementTaskIsCompletedSuccessfully())
{
_resolvedCap.Task.Result.Export(endpoint, writer);
using var proxy = new Proxy(_resolvedCap.Task.Result);
proxy.Export(endpoint, writer);
}
else
{
@ -147,7 +154,7 @@ namespace Capnp.Rpc
}
}
protected override Proxy? ResolvedCap
protected override ConsumedCapability? ResolvedCap
{
get
{
@ -194,7 +201,7 @@ namespace Capnp.Rpc
lock (_reentrancyBlocker)
{
_resolvedCap.SetResult(new Proxy(resolvedCap));
_resolvedCap.SetResult(resolvedCap);
if (_pendingCallsOnPromise == 0)
{
@ -218,7 +225,7 @@ namespace Capnp.Rpc
#if false
_resolvedCap.SetException(new RpcException(message));
#else
_resolvedCap.SetResult(new Proxy(LazyCapability.CreateBrokenCap(message)));
_resolvedCap.SetResult(LazyCapability.CreateBrokenCap(message));
#endif
if (_pendingCallsOnPromise == 0)
@ -234,7 +241,7 @@ namespace Capnp.Rpc
}
}
protected override void ReleaseRemotely()
protected async override void ReleaseRemotely()
{
if (!_released)
{
@ -243,7 +250,8 @@ namespace Capnp.Rpc
_ep.ReleaseImport(_remoteId);
this.DisposeWhenResolved();
try { using var _ = await _whenResolvedProxy; }
catch { }
}
protected override Call.WRITER SetupMessage(DynamicSerializerState args, ulong interfaceId, ushort methodId)

View File

@ -10,6 +10,14 @@ namespace Capnp.Rpc
/// </summary>
public class Proxy : IDisposable, IResolvingCapability
{
public static T Share<T>(T obj) where T: class
{
if (obj is Proxy proxy)
return proxy.Cast<T>(false);
else
return BareProxy.FromImpl(obj).Cast<T>(true);
}
#if DebugFinalizers
ILogger Logger { get; } = Logging.CreateLogger<Proxy>();
#endif
@ -19,18 +27,13 @@ namespace Capnp.Rpc
/// <summary>
/// Will eventually give the resolved capability, if this is a promised capability.
/// </summary>
public Task<Proxy> WhenResolved
public Task<ConsumedCapability?> WhenResolved
{
get
{
if (ConsumedCap is IResolvingCapability resolving)
{
return resolving.WhenResolved;
}
else
{
return Task.FromResult(this);
}
return ConsumedCap is IResolvingCapability resolving ?
resolving.WhenResolved :
Task.FromResult(ConsumedCap);
}
}
@ -139,20 +142,15 @@ namespace Capnp.Rpc
{
if (disposing)
{
ConsumedCap?.Release();
ConsumedCap?.Release(false);
}
else
{
// When called from the Finalizer, we must not throw.
// But when reference counting goes wrong, ConsumedCapability.Release() will throw an InvalidOperationException.
// The only option here is to suppress that exception.
try
{
ConsumedCap?.Release();
}
catch
{
}
try { ConsumedCap?.Release(false); }
catch { }
}
_disposedValue = true;

View File

@ -46,13 +46,8 @@ namespace Capnp.Rpc
{
if (disposing)
{
try
{
ReleaseRemotely();
}
catch
{
}
try { ReleaseRemotely(); }
catch { }
}
else
{
@ -60,13 +55,8 @@ namespace Capnp.Rpc
{
Task.Run(() =>
{
try
{
ReleaseRemotely();
}
catch
{
}
try { ReleaseRemotely(); }
catch { }
});
}
}
@ -76,7 +66,11 @@ namespace Capnp.Rpc
{
lock (_reentrancyBlocker)
{
if (++_refCount <= 1)
if (_refCount == int.MinValue)
{
_refCount = 2;
}
else if (++_refCount <= 1)
{
--_refCount;
@ -91,6 +85,7 @@ namespace Capnp.Rpc
}
internal sealed override void Release(
bool keepAlive,
[System.Runtime.CompilerServices.CallerMemberName] string methodName = "",
[System.Runtime.CompilerServices.CallerFilePath] string filePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0)
@ -99,6 +94,10 @@ namespace Capnp.Rpc
{
switch (_refCount)
{
case 2 when keepAlive:
_refCount = int.MinValue;
break;
case 1: // initial state, actually ref. count 0
case 2: // actually ref. count 1
_refCount = 0;

View File

@ -16,14 +16,25 @@ namespace Capnp.Rpc
readonly PendingQuestion _question;
readonly MemberAccessPath _access;
Proxy? _resolvedCap;
readonly Task<Proxy> _whenResolvedProxy;
public RemoteAnswerCapability(PendingQuestion question, MemberAccessPath access): base(question.RpcEndpoint)
public RemoteAnswerCapability(PendingQuestion question, MemberAccessPath access, Task<Proxy> proxyTask) : base(question.RpcEndpoint)
{
_question = question ?? throw new ArgumentNullException(nameof(question));
_access = access ?? throw new ArgumentNullException(nameof(access));
_whenResolvedProxy = proxyTask ?? throw new ArgumentNullException(nameof(proxyTask));
_ = AwaitWhenResolved();
async Task<ConsumedCapability?> AwaitWhenResolved()
{
var proxy = await _whenResolvedProxy;
if (_question.IsTailCall)
throw new InvalidOperationException("Question is a tail call, so won't resolve back.");
return proxy.ConsumedCap;
}
WhenResolved = AwaitWhenResolved();
}
async void ReAllowFinishWhenDone(Task task)
@ -47,52 +58,32 @@ namespace Capnp.Rpc
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
lock (_question.ReentrancyBlocker)
{
_resolvedCap?.Dispose();
}
}
protected override Proxy? ResolvedCap
protected override ConsumedCapability? ResolvedCap
{
get
{
lock (_question.ReentrancyBlocker)
{
if (_resolvedCap == null && !_question.IsTailCall && _question.IsReturned)
if (!_question.IsTailCall && WhenResolved.IsCompleted)
{
DeserializerState result;
try
{
result = _question.WhenReturned.Result;
return WhenResolved.Result;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
_resolvedCap = new Proxy(_access.Eval(result));
}
return _resolvedCap;
else
{
return null;
}
}
}
}
async Task<Proxy> AwaitWhenResolved()
{
await _question.WhenReturned;
if (_question.IsTailCall)
throw new InvalidOperationException("Question is a tail call, so won't resolve back.");
return ResolvedCap!;
}
public override Task<Proxy> WhenResolved => AwaitWhenResolved();
public override Task<ConsumedCapability?> WhenResolved { get; }
protected override void GetMessageTarget(MessageTarget.WRITER wr)
{
@ -251,9 +242,10 @@ namespace Capnp.Rpc
}
}
protected override void ReleaseRemotely()
protected async override void ReleaseRemotely()
{
this.DisposeWhenResolved();
try { using var _ = await _whenResolvedProxy; }
catch { }
}
}
}

View File

@ -0,0 +1,265 @@
using System;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
class RemoteAnswerCapabilityDeprecated : RemoteResolvingCapability
{
// Set DebugEmbargos to true to get logging output for calls. RPC calls are expected to
// be on the critical path, hence very relevant for performance. We just can't afford
// additional stuff on this path. Even if the logger filters the outputs away, there is
// overhead for creating the Logger object, calling the Logger methods and deciding to
// filter the output. This justifies the precompiler switch.
#if DebugEmbargos
ILogger Logger { get; } = Logging.CreateLogger<RemoteAnswerCapability>();
#endif
readonly PendingQuestion _question;
readonly MemberAccessPath _access;
readonly Task<Proxy> _whenResolvedProxy;
ConsumedCapability? _resolvedCap;
public RemoteAnswerCapabilityDeprecated(PendingQuestion question, MemberAccessPath access): base(question.RpcEndpoint)
{
_question = question ?? throw new ArgumentNullException(nameof(question));
_access = access ?? throw new ArgumentNullException(nameof(access));
async Task<ConsumedCapability?> AwaitWhenResolved()
{
await _question.WhenReturned;
if (_question.IsTailCall)
throw new InvalidOperationException("Question is a tail call, so won't resolve back.");
return ResolvedCap!;
}
WhenResolved = AwaitWhenResolved();
async Task<Proxy> AwaitProxy() => new Proxy(await WhenResolved);
_whenResolvedProxy = AwaitProxy();
}
async void ReAllowFinishWhenDone(Task task)
{
try
{
++_pendingCallsOnPromise;
await task;
}
catch
{
}
finally
{
lock (_question.ReentrancyBlocker)
{
--_pendingCallsOnPromise;
_question.AllowFinish();
}
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
lock (_question.ReentrancyBlocker)
{
using var _ = new Proxy(_resolvedCap);
}
}
protected override ConsumedCapability? ResolvedCap
{
get
{
lock (_question.ReentrancyBlocker)
{
if (_resolvedCap == null && !_question.IsTailCall && _question.IsReturned)
{
DeserializerState result;
try
{
result = _question.WhenReturned.Result;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
_resolvedCap = _access.Eval(result);
}
return _resolvedCap;
}
}
}
public override Task<ConsumedCapability?> WhenResolved { get; }
protected override void GetMessageTarget(MessageTarget.WRITER wr)
{
wr.which = MessageTarget.WHICH.PromisedAnswer;
wr.PromisedAnswer.QuestionId = _question.QuestionId;
_access.Serialize(wr.PromisedAnswer);
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)
{
lock (_question.ReentrancyBlocker)
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned) &&
!_question.StateFlags.HasFlag(PendingQuestion.State.TailCall))
{
if (ResolvedCap == null)
{
throw new RpcException("Answer did not resolve to expected capability");
}
return CallOnResolution(interfaceId, methodId, args);
}
else
{
#if DebugEmbargos
Logger.LogDebug("Call by proxy");
#endif
if (_question.StateFlags.HasFlag(PendingQuestion.State.Disposed))
{
throw new ObjectDisposedException(nameof(PendingQuestion));
}
if (_question.StateFlags.HasFlag(PendingQuestion.State.FinishRequested))
{
throw new InvalidOperationException("Finish request was already sent");
}
_question.DisallowFinish();
++_pendingCallsOnPromise;
var promisedAnswer = base.DoCall(interfaceId, methodId, args);
ReAllowFinishWhenDone(promisedAnswer.WhenReturned);
async void DecrementPendingCallsOnPromiseWhenReturned()
{
try
{
await promisedAnswer.WhenReturned;
}
catch
{
}
finally
{
lock (_question.ReentrancyBlocker)
{
--_pendingCallsOnPromise;
}
}
}
DecrementPendingCallsOnPromiseWhenReturned();
return promisedAnswer;
}
}
}
protected override Call.WRITER SetupMessage(DynamicSerializerState args, ulong interfaceId, ushort methodId)
{
var call = base.SetupMessage(args, interfaceId, methodId);
call.Target.which = MessageTarget.WHICH.PromisedAnswer;
call.Target.PromisedAnswer.QuestionId = _question.QuestionId;
_access.Serialize(call.Target.PromisedAnswer);
return call;
}
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
{
lock (_question.ReentrancyBlocker)
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned) &&
_pendingCallsOnPromise == 0)
{
if (ResolvedCap == null)
{
throw new RpcException("Answer did not resolve to expected capability");
}
ResolvedCap.Freeze(out boundEndpoint);
}
else
{
++_pendingCallsOnPromise;
_question.DisallowFinish();
boundEndpoint = _ep;
}
}
}
internal override void Unfreeze()
{
lock (_question.ReentrancyBlocker)
{
if (_pendingCallsOnPromise > 0)
{
--_pendingCallsOnPromise;
_question.AllowFinish();
}
else
{
ResolvedCap?.Unfreeze();
}
}
}
internal override void Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{
lock (_question.ReentrancyBlocker)
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.Disposed))
throw new ObjectDisposedException(nameof(PendingQuestion));
if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned))
{
ResolvedCap?.Export(endpoint, writer);
}
else
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.FinishRequested))
throw new InvalidOperationException("Finish request was already sent");
if (endpoint == _ep)
{
writer.which = CapDescriptor.WHICH.ReceiverAnswer;
_access.Serialize(writer.ReceiverAnswer);
writer.ReceiverAnswer.QuestionId = _question.QuestionId;
}
else if (_question.IsTailCall)
{
// FIXME: Resource management! We should prevent finishing this
// cap as long as it is exported. Unfortunately, we cannot determine
// when it gets removed from the export table.
var vine = Vine.Create(this);
uint id = endpoint.AllocateExport(vine, out bool first);
writer.which = CapDescriptor.WHICH.SenderHosted;
writer.SenderHosted = id;
}
else
{
this.ExportAsSenderPromise(endpoint, writer);
}
}
}
}
protected async override void ReleaseRemotely()
{
try { using var _ = await _whenResolvedProxy; }
catch { }
}
}
}

View File

@ -16,7 +16,7 @@ namespace Capnp.Rpc
ILogger Logger { get; } = Logging.CreateLogger<RemoteResolvingCapability>();
#endif
public abstract Task<Proxy> WhenResolved { get; }
public abstract Task<ConsumedCapability?> WhenResolved { get; }
protected RemoteResolvingCapability(IRpcEndpoint ep) : base(ep)
{
@ -25,7 +25,7 @@ namespace Capnp.Rpc
protected int _pendingCallsOnPromise;
Task? _disembargo;
protected abstract Proxy? ResolvedCap { get; }
protected abstract ConsumedCapability? ResolvedCap { get; }
protected abstract void GetMessageTarget(MessageTarget.WRITER wr);
@ -46,7 +46,7 @@ namespace Capnp.Rpc
throw new NotImplementedException("Sorry, level 3 RPC is not yet supported.");
}
if (ResolvedCap.IsNull ||
if (ResolvedCap == null ||
// If the capability resolves to null, disembargo must not be requested.
// Take the direct path, well-knowing that the call will result in an exception.
@ -65,7 +65,8 @@ namespace Capnp.Rpc
#if DebugEmbargos
Logger.LogDebug("Direct call");
#endif
return ResolvedCap.Call(interfaceId, methodId, args, default);
using var proxy = new Proxy(ResolvedCap);
return proxy.Call(interfaceId, methodId, args, default);
}
else
{
@ -93,7 +94,8 @@ namespace Capnp.Rpc
cancellationTokenSource.Token.ThrowIfCancellationRequested();
return ResolvedCap.Call(interfaceId, methodId, args, default);
using var proxy = new Proxy(ResolvedCap);
return proxy.Call(interfaceId, methodId, args, default);
}, TaskContinuationOptions.ExecuteSynchronously);

View File

@ -1,4 +1,7 @@
namespace Capnp.Rpc
using System;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
static class ResolvingCapabilityExtensions
{
@ -18,8 +21,7 @@
try
{
var resolvedCap = await cap.WhenResolved;
endpoint.Resolve(preliminaryId, vine, () => resolvedCap.ConsumedCap!);
endpoint.Resolve(preliminaryId, vine, () => resolvedCap!);
}
catch (System.Exception exception)
{
@ -30,15 +32,10 @@
}
}
public static async void DisposeWhenResolved(this IResolvingCapability cap)
public static async Task<Proxy> AsProxyTask(this Task<IDisposable> task)
{
try
{
(await cap.WhenResolved)?.Dispose();
}
catch
{
}
var obj = await task;
return obj is Proxy proxy ? proxy : BareProxy.FromImpl(obj);
}
}
}

View File

@ -433,6 +433,7 @@ namespace Capnp.Rpc
ret.Results.Content = results.Rewrap<DynamicSerializerState>();
ret.ReleaseParamCaps = releaseParamCaps;
ExportCapTableAndSend(results, ret.Results);
pendingAnswer.CapTable = ret.Results.CapTable;
break;
case Call.sendResultsTo.WHICH.Yourself:
@ -592,11 +593,9 @@ namespace Capnp.Rpc
{
try
{
using (var proxy = await t)
{
cap = proxy?.GetProvider();
CreateAnswerAwaitItAndReply();
}
using var proxy = await t;
cap = proxy?.GetProvider();
CreateAnswerAwaitItAndReply();
}
catch (TaskCanceledException)
{
@ -825,31 +824,29 @@ namespace Capnp.Rpc
{
try
{
using (var proxy = await t)
{
proxy.Freeze(out var boundEndpoint);
using var proxy = await t;
proxy.Freeze(out var boundEndpoint);
try
try
{
if (boundEndpoint == this)
{
if (boundEndpoint == this)
{
#if DebugEmbargos
Logger.LogDebug($"Sender loopback disembargo. Thread = {Thread.CurrentThread.Name}");
#endif
Tx(mb.Frame);
}
else
{
Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender.");
throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me");
}
Tx(mb.Frame);
}
finally
else
{
proxy.Unfreeze();
Logger.LogWarning("Sender loopback request: Peer asked for disembargoing an answer which does not resolve back to the sender.");
throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me");
}
}
finally
{
proxy.Unfreeze();
}
}
catch (System.Exception exception)
{
@ -933,15 +930,32 @@ namespace Capnp.Rpc
try
{
var aorcq = await t;
var results = aorcq.Answer;
var caps = answer.CapTable;
if (results != null && results.Caps != null)
if (caps != null)
{
foreach (var cap in results.Caps)
foreach (var capDesc in caps)
{
cap?.Release();
switch (capDesc.which)
{
case CapDescriptor.WHICH.SenderHosted:
ReleaseExport(capDesc.SenderHosted, 1);
break;
case CapDescriptor.WHICH.SenderPromise:
ReleaseExport(capDesc.SenderPromise, 1);
break;
}
}
}
//if (results != null && results.Caps != null)
//{
// foreach (var cap in results.Caps)
// {
// cap?.Release();
// }
//}
}
catch
{
@ -1091,7 +1105,7 @@ namespace Capnp.Rpc
Tx(mb.Frame);
var main = new RemoteAnswerCapability(
var main = new RemoteAnswerCapabilityDeprecated(
pendingBootstrap,
MemberAccessPath.BootstrapAccess);
@ -1360,7 +1374,7 @@ namespace Capnp.Rpc
else
{
cap.Export(this, capDesc);
cap.Release();
cap.Release(false);
}
}

View File

@ -38,10 +38,8 @@ namespace Capnp.Rpc
{
try
{
using (var registration = cancellationToken.Register(promisedAnswer.Dispose))
{
await promisedAnswer.WhenReturned;
}
using var registration = cancellationToken.Register(promisedAnswer.Dispose);
await promisedAnswer.WhenReturned;
}
catch
{
@ -54,10 +52,8 @@ namespace Capnp.Rpc
}
else
{
using (var registration = cancellationToken.Register(promisedAnswer.Dispose))
{
return (DynamicSerializerState)await promisedAnswer.WhenReturned;
}
using var registration = cancellationToken.Register(promisedAnswer.Dispose);
return (DynamicSerializerState)await promisedAnswer.WhenReturned;
}
}

View File

@ -1259,7 +1259,8 @@ namespace Capnp
/// </summary>
/// <param name="obj">The capability, in one of the following forms:<list type="bullet">
/// <item><description>Low-level capability object (<code>Rpc.ConsumedCapability</code>)</description></item>
/// <item><description>Proxy object (<code>Rpc.Proxy</code>)</description></item>
/// <item><description>Proxy object (<code>Rpc.Proxy</code>). Note that the provision has "move semantics": SerializerState
/// takes ownership, so the Proxy object will be disposed.</description></item>
/// <item><description>Skeleton object (<code>Rpc.Skeleton</code>)</description></item>
/// <item><description>Capability interface implementation</description></item>
/// </list></param>
@ -1267,16 +1268,19 @@ namespace Capnp
/// <exception cref="InvalidOperationException">The underlying message builder was not configured for capability table support.</exception>
public uint ProvideCapability(object? obj)
{
if (obj == null)
return ProvideCapability(default(Rpc.ConsumedCapability));
else if (obj is Rpc.Proxy proxy)
return ProvideCapability(proxy.ConsumedCap);
else if (obj is Rpc.ConsumedCapability consumedCapability)
return ProvideCapability(consumedCapability);
else if (obj is Rpc.Skeleton providedCapability)
return ProvideCapability(providedCapability);
else
return ProvideCapability(Rpc.Skeleton.GetOrCreateSkeleton(obj, false));
switch (obj)
{
case null:
return ProvideCapability(default(Rpc.ConsumedCapability));
case Rpc.Proxy proxy: using (proxy)
return ProvideCapability(proxy.ConsumedCap);
case Rpc.ConsumedCapability consumedCapability:
return ProvideCapability(consumedCapability);
case Rpc.Skeleton providedCapability:
return ProvideCapability(providedCapability);
default:
return ProvideCapability(Rpc.Skeleton.GetOrCreateSkeleton(obj, false));
}
}
/// <summary>