redesigned ConsumedCapability <> Skeleton conversion

This commit is contained in:
Christian Köllner 2020-04-10 18:29:06 +02:00
parent 5a04f2f3da
commit 3d0683288a
33 changed files with 330 additions and 343 deletions

View File

@ -4,7 +4,7 @@ using Capnp.Rpc;
namespace Capnp.Net.Runtime.Tests namespace Capnp.Net.Runtime.Tests
{ {
class ProvidedCapabilityMock : Skeleton class ProvidedCapabilityMock : RefCountingSkeleton
{ {
readonly TaskCompletionSource<(ulong, ushort, DeserializerState, CancellationToken)> readonly TaskCompletionSource<(ulong, ushort, DeserializerState, CancellationToken)>
_call = new TaskCompletionSource<(ulong, ushort, DeserializerState, CancellationToken)>(); _call = new TaskCompletionSource<(ulong, ushort, DeserializerState, CancellationToken)>();

View File

@ -6,7 +6,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace Capnp.Net.Runtime.Tests namespace Capnp.Net.Runtime.Tests
{ {
class ProvidedCapabilityMultiCallMock : Skeleton class ProvidedCapabilityMultiCallMock : RefCountingSkeleton
{ {
readonly BufferBlock<TestCallContext> _ccs = new BufferBlock<TestCallContext>(); readonly BufferBlock<TestCallContext> _ccs = new BufferBlock<TestCallContext>();

View File

@ -389,7 +389,7 @@ namespace Capnp.Net.Runtime.Tests
var result = DynamicSerializerState.CreateForRpc(); var result = DynamicSerializerState.CreateForRpc();
result.SetStruct(1, 2); result.SetStruct(1, 2);
result.WriteData(0, 654321); result.WriteData(0, 654321);
uint id = result.ProvideCapability(mock2); uint id = result.ProvideCapability(mock2).Value;
result.LinkToCapability(1, id); result.LinkToCapability(1, id);
mock.Return.SetResult(result); mock.Return.SetResult(result);
@ -453,7 +453,7 @@ namespace Capnp.Net.Runtime.Tests
var result = DynamicSerializerState.CreateForRpc(); var result = DynamicSerializerState.CreateForRpc();
result.SetStruct(1, 2); result.SetStruct(1, 2);
result.WriteData(0, 654321); result.WriteData(0, 654321);
uint id = result.ProvideCapability(mock2); uint id = result.ProvideCapability(mock2).Value;
result.LinkToCapability(1, id); result.LinkToCapability(1, id);
mock.Return.SetResult(result); mock.Return.SetResult(result);
@ -543,7 +543,7 @@ namespace Capnp.Net.Runtime.Tests
var result = DynamicSerializerState.CreateForRpc(); var result = DynamicSerializerState.CreateForRpc();
result.SetStruct(1, 2); result.SetStruct(1, 2);
result.WriteData(0, 654321); result.WriteData(0, 654321);
uint id = result.ProvideCapability(mock2); uint id = result.ProvideCapability(mock2).Value;
result.LinkToCapability(1, id); result.LinkToCapability(1, id);
mock.Return.SetResult(result); mock.Return.SetResult(result);
@ -707,7 +707,7 @@ namespace Capnp.Net.Runtime.Tests
var result = DynamicSerializerState.CreateForRpc(); var result = DynamicSerializerState.CreateForRpc();
result.SetStruct(1, 2); result.SetStruct(1, 2);
result.WriteData(0, 654321); result.WriteData(0, 654321);
uint id = result.ProvideCapability(mock2); uint id = result.ProvideCapability(mock2).Value;
result.LinkToCapability(1, id); result.LinkToCapability(1, id);
mock.Return.SetResult(result); mock.Return.SetResult(result);

View File

@ -677,7 +677,7 @@ namespace Capnp.Net.Runtime.Tests
} }
} }
class ThrowingSkeleton : Skeleton class ThrowingSkeleton : RefCountingSkeleton
{ {
public bool WasCalled { get; private set; } public bool WasCalled { get; private set; }

View File

@ -386,7 +386,7 @@ namespace Capnp
/// <exception cref="ArgumentOutOfRangeException">offset negative or out of range</exception> /// <exception cref="ArgumentOutOfRangeException">offset negative or out of range</exception>
/// <exception cref="InvalidOperationException">capability table not set</exception> /// <exception cref="InvalidOperationException">capability table not set</exception>
/// <exception cref="Rpc.RpcException">not a capability pointer or invalid capability index</exception> /// <exception cref="Rpc.RpcException">not a capability pointer or invalid capability index</exception>
internal Rpc.ConsumedCapability? DecodeCapPointer(int offset) internal Rpc.ConsumedCapability DecodeCapPointer(int offset)
{ {
if (offset < 0) if (offset < 0)
{ {
@ -404,7 +404,7 @@ namespace Capnp
{ {
// Despite this behavior is not officially specified, // Despite this behavior is not officially specified,
// the official C++ implementation seems to send null pointers for null caps. // the official C++ implementation seems to send null pointers for null caps.
return null; return Rpc.NullCapability.Instance;
} }
if (pointer.Kind != PointerKind.Other) if (pointer.Kind != PointerKind.Other)
@ -496,13 +496,13 @@ namespace Capnp
return state; return state;
} }
internal Rpc.ConsumedCapability? StructReadRawCap(int index) internal Rpc.ConsumedCapability StructReadRawCap(int index)
{ {
if (Kind != ObjectKind.Struct && Kind != ObjectKind.Nil) if (Kind != ObjectKind.Struct && Kind != ObjectKind.Nil)
throw new InvalidOperationException("Allowed on structs only"); throw new InvalidOperationException("Allowed on structs only");
if (index >= StructPtrCount) if (index >= StructPtrCount)
return null; return Rpc.NullCapability.Instance;
return DecodeCapPointer(index + StructDataCount); return DecodeCapPointer(index + StructDataCount);
} }

View File

@ -19,7 +19,7 @@
/// <exception cref="System.TypeLoadException">Problem with building the Skeleton type, or problem with loading some dependent class.</exception> /// <exception cref="System.TypeLoadException">Problem with building the Skeleton type, or problem with loading some dependent class.</exception>
public static BareProxy FromImpl(object impl) public static BareProxy FromImpl(object impl)
{ {
return new BareProxy(LocalCapability.Create(CapabilityReflection.CreateSkeleton(impl))); return new BareProxy(CapabilityReflection.CreateSkeleton(impl).AsCapability());
} }
/// <summary> /// <summary>
@ -33,7 +33,7 @@
/// Constructs an instance and binds it to the given low-level capability. /// Constructs an instance and binds it to the given low-level capability.
/// </summary> /// </summary>
/// <param name="cap">low-level capability</param> /// <param name="cap">low-level capability</param>
public BareProxy(ConsumedCapability? cap): base(cap) public BareProxy(ConsumedCapability cap): base(cap)
{ {
} }

View File

@ -259,7 +259,7 @@ namespace Capnp.Rpc
/// <exception cref="System.Reflection.TargetInvocationException">Problem with instatiating the Proxy (constructor threw exception).</exception> /// <exception cref="System.Reflection.TargetInvocationException">Problem with instatiating the Proxy (constructor threw exception).</exception>
/// <exception cref="MemberAccessException">Caller does not have permission to invoke the Proxy constructor.</exception> /// <exception cref="MemberAccessException">Caller does not have permission to invoke the Proxy constructor.</exception>
/// <exception cref="TypeLoadException">Problem with building the Proxy type, or problem with loading some dependent class.</exception> /// <exception cref="TypeLoadException">Problem with building the Proxy type, or problem with loading some dependent class.</exception>
public static Proxy CreateProxy<TInterface>(ConsumedCapability? cap) public static Proxy CreateProxy<TInterface>(ConsumedCapability cap)
{ {
var factory = GetProxyFactory(typeof(TInterface)); var factory = GetProxyFactory(typeof(TInterface));
var proxy = factory.NewProxy(); var proxy = factory.NewProxy();

View File

@ -18,6 +18,7 @@ namespace Capnp.Rpc
internal abstract Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer); internal abstract Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer);
internal abstract void AddRef(); internal abstract void AddRef();
internal abstract void Release(); internal abstract void Release();
internal abstract Skeleton AsSkeleton();
#if DebugFinalizers #if DebugFinalizers
internal Proxy? OwningProxy { get; set; } internal Proxy? OwningProxy { get; set; }

View File

@ -22,7 +22,7 @@ namespace Capnp.Rpc
/// </summary> /// </summary>
/// <param name="access">Path to the desired capability inside the result struct.</param> /// <param name="access">Path to the desired capability inside the result struct.</param>
/// <returns>Pipelined low-level capability</returns> /// <returns>Pipelined low-level capability</returns>
ConsumedCapability? Access(MemberAccessPath access); ConsumedCapability Access(MemberAccessPath access);
/// <summary> /// <summary>
/// ///
@ -30,7 +30,7 @@ namespace Capnp.Rpc
/// <param name="access">Creates a low-level capability for promise pipelining.</param> /// <param name="access">Creates a low-level capability for promise pipelining.</param>
/// <param name="proxyTask">Task returning the proxy whose ownership will be taken over</param> /// <param name="proxyTask">Task returning the proxy whose ownership will be taken over</param>
/// <returns></returns> /// <returns></returns>
ConsumedCapability? Access(MemberAccessPath access, Task<IDisposable?> proxyTask); ConsumedCapability Access(MemberAccessPath access, Task<IDisposable?> proxyTask);
/// <summary> /// <summary>
/// Whether the question was asked as tail call /// Whether the question was asked as tail call

View File

@ -76,7 +76,7 @@ namespace Capnp.Rpc
/// <param name="access">path to the desired capability</param> /// <param name="access">path to the desired capability</param>
/// <param name="proxyTask">task returning a proxy to the desired capability</param> /// <param name="proxyTask">task returning a proxy to the desired capability</param>
/// <returns>Pipelined low-level capability</returns> /// <returns>Pipelined low-level capability</returns>
public static ConsumedCapability? Access(Task task, MemberAccessPath access, Task<IDisposable?> proxyTask) public static ConsumedCapability Access(Task task, MemberAccessPath access, Task<IDisposable?> proxyTask)
{ {
var answer = TryGetAnswer(task); var answer = TryGetAnswer(task);
if (answer != null) return answer.Access(access, proxyTask); if (answer != null) return answer.Access(access, proxyTask);
@ -164,7 +164,7 @@ namespace Capnp.Rpc
return cap; return cap;
var unwrapped = await proxy.ConsumedCap.Unwrap(); var unwrapped = await proxy.ConsumedCap.Unwrap();
if (unwrapped == null) if (unwrapped == null || unwrapped == NullCapability.Instance)
return null; return null;
return ((CapabilityReflection.CreateProxy<TInterface>(unwrapped)) as TInterface)!; return ((CapabilityReflection.CreateProxy<TInterface>(unwrapped)) as TInterface)!;

View File

@ -38,7 +38,7 @@ namespace Capnp.Rpc
else else
{ {
capDesc.which = CapDescriptor.WHICH.SenderHosted; capDesc.which = CapDescriptor.WHICH.SenderHosted;
capDesc.SenderHosted = endpoint.AllocateExport(Vine.Create(this), out var _); capDesc.SenderHosted = endpoint.AllocateExport(AsSkeleton(), out var _);
} }
return null; return null;
} }

View File

@ -25,12 +25,12 @@ namespace Capnp.Rpc.Interception
public Task<DeserializerState> WhenReturned => _futureResult.Task; public Task<DeserializerState> WhenReturned => _futureResult.Task;
public CancellationToken CancelFromAlice => _cancelFromAlice.Token; public CancellationToken CancelFromAlice => _cancelFromAlice.Token;
public ConsumedCapability? Access(MemberAccessPath access) public ConsumedCapability Access(MemberAccessPath access)
{ {
return new LocalAnswerCapability(_futureResult.Task, access); return new LocalAnswerCapability(_futureResult.Task, access);
} }
public ConsumedCapability? Access(MemberAccessPath _, Task<IDisposable?> task) public ConsumedCapability Access(MemberAccessPath _, Task<IDisposable?> task)
{ {
var proxyTask = task.AsProxyTask(); var proxyTask = task.AsProxyTask();
return new LocalAnswerCapability(proxyTask); return new LocalAnswerCapability(proxyTask);
@ -165,12 +165,18 @@ namespace Capnp.Rpc.Interception
BobProxy = proxy.Cast<BareProxy>(false); BobProxy = proxy.Cast<BareProxy>(false);
break; break;
case ConsumedCapability cap: using (var temp = CapabilityReflection.CreateProxy<object>(cap)) { case ConsumedCapability cap:
Bob = temp; } using (var temp = CapabilityReflection.CreateProxy<object>(cap))
{
Bob = temp;
}
break; break;
case Skeleton skeleton: case Skeleton skeleton:
Bob = LocalCapability.Create(skeleton); using (var nullProxy = new Proxy())
{
Bob = (object?)skeleton.AsCapability() ?? nullProxy;
}
break; break;
default: default:
@ -213,12 +219,15 @@ namespace Capnp.Rpc.Interception
for (int i = 0; i < state.Caps.Count; i++) for (int i = 0; i < state.Caps.Count; i++)
{ {
var cap = state.Caps[i]; var cap = state.Caps[i];
if (cap != null)
{
cap = policy.Attach(cap); cap = policy.Attach(cap);
state.Caps[i] = cap; state.Caps[i] = cap;
cap.AddRef(); cap.AddRef();
} }
} }
} }
}
static void UninterceptCaps(DeserializerState state, IInterceptionPolicy policy) static void UninterceptCaps(DeserializerState state, IInterceptionPolicy policy)
{ {
@ -227,12 +236,15 @@ namespace Capnp.Rpc.Interception
for (int i = 0; i < state.Caps.Count; i++) for (int i = 0; i < state.Caps.Count; i++)
{ {
var cap = state.Caps[i]; var cap = state.Caps[i];
if (cap != null)
{
cap = policy.Detach(cap); cap = policy.Detach(cap);
state.Caps[i] = cap; state.Caps[i] = cap;
cap.AddRef(); cap.AddRef();
} }
} }
} }
}
/// <summary> /// <summary>
/// Intercepts all capabilies inside the input arguments /// Intercepts all capabilies inside the input arguments

View File

@ -9,12 +9,10 @@ namespace Capnp.Rpc.Interception
InterceptedCapability = interceptedCapability; InterceptedCapability = interceptedCapability;
interceptedCapability.AddRef(); interceptedCapability.AddRef();
Policy = policy; Policy = policy;
MyVine = Vine.Create(this);
} }
public ConsumedCapability InterceptedCapability { get; } public ConsumedCapability InterceptedCapability { get; }
public IInterceptionPolicy Policy { get; } public IInterceptionPolicy Policy { get; }
internal Skeleton MyVine { get; }
protected override void ReleaseRemotely() protected override void ReleaseRemotely()
{ {
@ -31,7 +29,7 @@ namespace Capnp.Rpc.Interception
internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{ {
writer.which = CapDescriptor.WHICH.SenderHosted; writer.which = CapDescriptor.WHICH.SenderHosted;
writer.SenderHosted = endpoint.AllocateExport(MyVine, out bool _); writer.SenderHosted = endpoint.AllocateExport(AsSkeleton(), out bool _);
return null; return null;
} }
} }

View File

@ -52,10 +52,9 @@ namespace Capnp.Rpc.Interception
return (new CensorCapability(ccap, policy) as TCap)!; return (new CensorCapability(ccap, policy) as TCap)!;
default: default:
return (Attach(policy, var temp = (CapabilityReflection.CreateProxy<TCap>(
(CapabilityReflection.CreateProxy<TCap>( Skeleton.GetOrCreateSkeleton(cap, false).AsCapability())) as TCap;
LocalCapability.Create( return Attach(policy, temp!)!;
Skeleton.GetOrCreateSkeleton(cap, false))) as TCap)!));
} }
} }

View File

@ -4,22 +4,23 @@ using System.Threading.Tasks;
namespace Capnp.Rpc namespace Capnp.Rpc
{ {
class LazyCapability : RefCountingCapability, IResolvingCapability class LazyCapability : RefCountingCapability, IResolvingCapability
{ {
public static LazyCapability CreateBrokenCap(string message) public static LazyCapability CreateBrokenCap(string message)
{ {
return new LazyCapability(Task.FromException<ConsumedCapability?>(new RpcException(message))); return new LazyCapability(Task.FromException<ConsumedCapability>(new RpcException(message)));
} }
public static LazyCapability CreateCanceledCap(CancellationToken token) public static LazyCapability CreateCanceledCap(CancellationToken token)
{ {
return new LazyCapability(Task.FromCanceled<ConsumedCapability?>(token)); return new LazyCapability(Task.FromCanceled<ConsumedCapability>(token));
} }
readonly Task<Proxy>? _proxyTask; readonly Task<Proxy>? _proxyTask;
readonly Task<ConsumedCapability?> _capTask; readonly Task<ConsumedCapability> _capTask;
public LazyCapability(Task<ConsumedCapability?> capabilityTask) public LazyCapability(Task<ConsumedCapability> capabilityTask)
{ {
_capTask = capabilityTask; _capTask = capabilityTask;
} }
@ -28,7 +29,7 @@ namespace Capnp.Rpc
{ {
_proxyTask = proxyTask; _proxyTask = proxyTask;
async Task<ConsumedCapability?> AwaitCap() => (await _proxyTask!).ConsumedCap; async Task<ConsumedCapability> AwaitCap() => (await _proxyTask!).ConsumedCap;
_capTask = AwaitCap(); _capTask = AwaitCap();
} }
@ -68,7 +69,7 @@ namespace Capnp.Rpc
{ {
try try
{ {
return CapabilityReflection.CreateProxy<T>(_capTask.Result) as T; return (CapabilityReflection.CreateProxy<T>(_capTask.Result) as T)!;
} }
catch (AggregateException exception) catch (AggregateException exception)
{ {
@ -83,7 +84,7 @@ namespace Capnp.Rpc
async Task<DeserializerState> CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken) async Task<DeserializerState> CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken)
{ {
ConsumedCapability? cap; ConsumedCapability cap;
try try
{ {
cap = await _capTask; cap = await _capTask;

View File

@ -7,14 +7,6 @@ namespace Capnp.Rpc
{ {
class LocalCapability : ConsumedCapability class LocalCapability : ConsumedCapability
{ {
public static ConsumedCapability Create(Skeleton skeleton)
{
if (skeleton is Vine vine)
return vine.Proxy.ConsumedCap!;
else
return new LocalCapability(skeleton);
}
static async Task<DeserializerState> AwaitAnswer(Task<AnswerOrCounterquestion> call) static async Task<DeserializerState> AwaitAnswer(Task<AnswerOrCounterquestion> call)
{ {
var aorcq = await call; var aorcq = await call;
@ -23,7 +15,9 @@ namespace Capnp.Rpc
public Skeleton ProvidedCap { get; } public Skeleton ProvidedCap { get; }
LocalCapability(Skeleton providedCap) internal override Skeleton AsSkeleton() => ProvidedCap;
public LocalCapability(Skeleton providedCap)
{ {
ProvidedCap = providedCap ?? throw new ArgumentNullException(nameof(providedCap)); ProvidedCap = providedCap ?? throw new ArgumentNullException(nameof(providedCap));
} }

View File

@ -169,7 +169,7 @@ namespace Capnp.Rpc
/// <param name="rpcState">The object (usually "params struct") on which to evaluate this path.</param> /// <param name="rpcState">The object (usually "params struct") on which to evaluate this path.</param>
/// <returns>Resulting low-level capability</returns> /// <returns>Resulting low-level capability</returns>
/// <exception cref="DeserializationException">Evaluation of this path did not give a capability</exception> /// <exception cref="DeserializationException">Evaluation of this path did not give a capability</exception>
public ConsumedCapability? Eval(DeserializerState rpcState) public ConsumedCapability Eval(DeserializerState rpcState)
{ {
var cur = rpcState; var cur = rpcState;
@ -181,7 +181,7 @@ namespace Capnp.Rpc
switch (cur.Kind) switch (cur.Kind)
{ {
case ObjectKind.Nil: case ObjectKind.Nil:
return null; return NullCapability.Instance;
case ObjectKind.Capability: case ObjectKind.Capability:
return rpcState.Caps![(int)cur.CapabilityIndex]; return rpcState.Caps![(int)cur.CapabilityIndex];

View File

@ -0,0 +1,54 @@
using System;
namespace Capnp.Rpc
{
/// <summary>
/// Null capability
/// </summary>
public sealed class NullCapability : ConsumedCapability
{
/// <summary>
/// Singleton instance
/// </summary>
public static readonly NullCapability Instance = new NullCapability();
NullCapability()
{
}
/// <summary>
/// Does nothing
/// </summary>
protected override void ReleaseRemotely()
{
}
internal override void AddRef()
{
}
internal override Skeleton AsSkeleton() => NullSkeleton.Instance;
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)
{
args.Dispose();
throw new InvalidOperationException("Cannot call null capability");
}
internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{
writer.which = CapDescriptor.WHICH.None;
return null;
}
internal override void Release()
{
}
/// <summary>
/// String hint
/// </summary>
/// <returns>"Null capability"</returns>
public override string ToString() => "Null capability";
}
}

View File

@ -0,0 +1,41 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
/// <summary>
/// Null skeleton
/// </summary>
public sealed class NullSkeleton : Skeleton
{
/// <summary>
/// Singleton instance
/// </summary>
public static readonly NullSkeleton Instance = new NullSkeleton();
NullSkeleton()
{
}
/// <summary>
/// Always throws an exception
/// </summary>
/// <exception cref="InvalidOperationException">always thrown</exception>
public override Task<AnswerOrCounterquestion> Invoke(ulong interfaceId, ushort methodId, DeserializerState args, CancellationToken cancellationToken = default)
{
args.Dispose();
throw new InvalidOperationException("Cannot call null capability");
}
internal override ConsumedCapability AsCapability() => NullCapability.Instance;
internal override void Claim()
{
}
internal override void Relinquish()
{
}
}
}

View File

@ -88,7 +88,7 @@ namespace Capnp.Rpc
} }
catch (System.Exception) catch (System.Exception)
{ {
throw new ArgumentOutOfRangeException("Illegal pointer field in transformation operation"); throw new RpcException("Illegal pointer field in transformation operation");
} }
break; break;
@ -100,21 +100,20 @@ namespace Capnp.Rpc
} }
} }
Proxy proxy;
switch (cur.Kind) switch (cur.Kind)
{ {
case ObjectKind.Capability: case ObjectKind.Capability:
try try
{ {
var cap = aorcq.Answer.Caps![(int)cur.CapabilityIndex]; return new Proxy(aorcq.Answer.Caps![(int)cur.CapabilityIndex]);
proxy = new Proxy(cap);
} }
catch (ArgumentOutOfRangeException) catch (ArgumentOutOfRangeException)
{ {
throw new ArgumentOutOfRangeException("Bad capability table in internal answer - internal error?"); throw new RpcException("Capability index out of range");
} }
return proxy;
case ObjectKind.Nil:
return new Proxy(NullCapability.Instance);
default: default:
throw new ArgumentOutOfRangeException("Transformation did not result in a capability"); throw new ArgumentOutOfRangeException("Transformation did not result in a capability");

View File

@ -58,7 +58,7 @@ namespace Capnp.Rpc
SerializerState? _inParams; SerializerState? _inParams;
int _inhibitFinishCounter, _refCounter; int _inhibitFinishCounter, _refCounter;
internal PendingQuestion(IRpcEndpoint ep, uint id, ConsumedCapability? target, SerializerState? inParams) internal PendingQuestion(IRpcEndpoint ep, uint id, ConsumedCapability target, SerializerState? inParams)
{ {
RpcEndpoint = ep ?? throw new ArgumentNullException(nameof(ep)); RpcEndpoint = ep ?? throw new ArgumentNullException(nameof(ep));
_questionId = id; _questionId = id;
@ -237,7 +237,7 @@ namespace Capnp.Rpc
/// <param name="access">Access path</param> /// <param name="access">Access path</param>
/// <returns>Low-level capability</returns> /// <returns>Low-level capability</returns>
/// <exception cref="DeserializationException">The referenced member does not exist or does not resolve to a capability pointer.</exception> /// <exception cref="DeserializationException">The referenced member does not exist or does not resolve to a capability pointer.</exception>
public ConsumedCapability? Access(MemberAccessPath access) public ConsumedCapability Access(MemberAccessPath access)
{ {
lock (ReentrancyBlocker) lock (ReentrancyBlocker)
{ {
@ -268,7 +268,7 @@ namespace Capnp.Rpc
/// <param name="task">promises the cap whose ownership is transferred to this object</param> /// <param name="task">promises the cap whose ownership is transferred to this object</param>
/// <returns>Low-level capability</returns> /// <returns>Low-level capability</returns>
/// <exception cref="DeserializationException">The referenced member does not exist or does not resolve to a capability pointer.</exception> /// <exception cref="DeserializationException">The referenced member does not exist or does not resolve to a capability pointer.</exception>
public ConsumedCapability? Access(MemberAccessPath access, Task<IDisposable?> task) public ConsumedCapability Access(MemberAccessPath access, Task<IDisposable?> task)
{ {
var proxyTask = task.AsProxyTask(); var proxyTask = task.AsProxyTask();
return new RemoteAnswerCapability(this, access, proxyTask); return new RemoteAnswerCapability(this, access, proxyTask);

View File

@ -8,7 +8,7 @@ namespace Capnp.Rpc
/// <summary> /// <summary>
/// Combines multiple skeletons to represent objects which implement multiple interfaces. /// Combines multiple skeletons to represent objects which implement multiple interfaces.
/// </summary> /// </summary>
public class PolySkeleton: Skeleton public class PolySkeleton: RefCountingSkeleton
{ {
readonly Dictionary<ulong, Skeleton> _ifmap = new Dictionary<ulong, Skeleton>(); readonly Dictionary<ulong, Skeleton> _ifmap = new Dictionary<ulong, Skeleton>();

View File

@ -8,7 +8,7 @@ namespace Capnp.Rpc
{ {
readonly uint _remoteId; readonly uint _remoteId;
readonly object _reentrancyBlocker = new object(); readonly object _reentrancyBlocker = new object();
readonly TaskCompletionSource<ConsumedCapability?> _resolvedCap = new TaskCompletionSource<ConsumedCapability?>(); readonly TaskCompletionSource<ConsumedCapability> _resolvedCap = new TaskCompletionSource<ConsumedCapability>();
readonly Task<Proxy> _whenResolvedProxy; readonly Task<Proxy> _whenResolvedProxy;
bool _released; bool _released;
@ -149,9 +149,10 @@ namespace Capnp.Rpc
lock (_reentrancyBlocker) lock (_reentrancyBlocker)
{ {
#if DebugFinalizers #if DebugFinalizers
if (resolvedCap != null)
resolvedCap.ResolvingCap = this; resolvedCap.ResolvingCap = this;
#endif #endif
_resolvedCap.SetResult(resolvedCap); _resolvedCap.SetResult(resolvedCap!);
if (_pendingCallsOnPromise == 0) if (_pendingCallsOnPromise == 0)
{ {

View File

@ -53,18 +53,18 @@ namespace Capnp.Rpc
return CapabilityReflection.CreateProxy<T>(ConsumedCap) as T; return CapabilityReflection.CreateProxy<T>(ConsumedCap) as T;
} }
ConsumedCapability? _consumedCap; ConsumedCapability _consumedCap = NullCapability.Instance;
/// <summary> /// <summary>
/// Underlying low-level capability /// Underlying low-level capability
/// </summary> /// </summary>
protected internal ConsumedCapability? ConsumedCap => _disposedValue ? protected internal ConsumedCapability ConsumedCap => _disposedValue ?
throw new ObjectDisposedException(nameof(Proxy)) : _consumedCap; throw new ObjectDisposedException(nameof(Proxy)) : _consumedCap;
/// <summary> /// <summary>
/// Whether is this a broken capability. /// Whether is this a broken capability.
/// </summary> /// </summary>
public bool IsNull => _consumedCap == null; public bool IsNull => _consumedCap == NullCapability.Instance;
/// <summary> /// <summary>
/// Whether <see cref="Dispose()"/> was called on this Proxy. /// Whether <see cref="Dispose()"/> was called on this Proxy.
@ -100,12 +100,6 @@ namespace Capnp.Rpc
throw new ObjectDisposedException(nameof(Proxy)); throw new ObjectDisposedException(nameof(Proxy));
} }
if (ConsumedCap == null)
{
args.Dispose();
throw new InvalidOperationException("Cannot call null capability");
}
var answer = ConsumedCap.DoCall(interfaceId, methodId, args); var answer = ConsumedCap.DoCall(interfaceId, methodId, args);
if (cancellationToken.CanBeCanceled) if (cancellationToken.CanBeCanceled)
@ -126,20 +120,17 @@ namespace Capnp.Rpc
#endif #endif
} }
internal Proxy(ConsumedCapability? cap): this() internal Proxy(ConsumedCapability cap): this()
{ {
Bind(cap); Bind(cap);
} }
internal void Bind(ConsumedCapability? cap) internal void Bind(ConsumedCapability cap)
{ {
if (ConsumedCap != null) if (ConsumedCap != NullCapability.Instance)
throw new InvalidOperationException("Proxy was already bound"); throw new InvalidOperationException("Proxy was already bound");
if (cap == null) _consumedCap = cap ?? throw new ArgumentNullException(nameof(cap));
return;
_consumedCap = cap;
cap.AddRef(); cap.AddRef();
#if DebugFinalizers #if DebugFinalizers
@ -151,15 +142,7 @@ namespace Capnp.Rpc
internal async Task<Skeleton> GetProvider() internal async Task<Skeleton> GetProvider()
{ {
var unwrapped = await ConsumedCap.Unwrap(); var unwrapped = await ConsumedCap.Unwrap();
return unwrapped.AsSkeleton();
switch (unwrapped)
{
case LocalCapability lcap:
return lcap.ProvidedCap;
default:
return Vine.Create(unwrapped);
}
} }
/// <summary> /// <summary>

View File

@ -7,6 +7,7 @@ namespace Capnp.Rpc
abstract class RefCountingCapability: ConsumedCapability abstract class RefCountingCapability: ConsumedCapability
{ {
readonly object _reentrancyBlocker = new object(); readonly object _reentrancyBlocker = new object();
Vine? _vine;
// Note on reference counting: Works in analogy to COM. AddRef() adds a reference, // Note on reference counting: Works in analogy to COM. AddRef() adds a reference,
// Release() removes it. When the reference count reaches zero, the capability must be // Release() removes it. When the reference count reaches zero, the capability must be
@ -122,5 +123,15 @@ namespace Capnp.Rpc
} }
} }
} }
internal override Skeleton AsSkeleton()
{
lock (_reentrancyBlocker)
{
if (_vine == null)
_vine = new Vine(this);
return _vine;
}
}
} }
} }

View File

@ -0,0 +1,80 @@
using System;
using System.Threading;
namespace Capnp.Rpc
{
/// <summary>
/// Skeleton with reference counting and dispose pattern
/// </summary>
public abstract class RefCountingSkeleton: Skeleton
{
int _refCount;
LocalCapability? _localCap;
/// <summary>
/// Dispose pattern implementation
/// </summary>
protected virtual void Dispose(bool disposing)
{
}
/// <summary>
/// Finalizer
/// </summary>
~RefCountingSkeleton()
{
Dispose(false);
}
internal sealed override void Claim()
{
int count, newCount;
do
{
count = Volatile.Read(ref _refCount);
if (count < 0)
throw new ObjectDisposedException(nameof(RefCountingSkeleton));
newCount = count + 1;
} while (Interlocked.CompareExchange(ref _refCount, newCount, count) != count);
}
internal override void Relinquish()
{
int count, newCount;
do
{
count = Volatile.Read(ref _refCount);
if (count < 0)
throw new ObjectDisposedException(nameof(RefCountingSkeleton));
newCount = count > 0 ? count - 1 : int.MinValue;
} while (Interlocked.CompareExchange(ref _refCount, newCount, count) != count);
if (newCount == 0)
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
/// <summary>
/// Whether this instance is in disposed state.
/// </summary>
public bool IsDisposed => Volatile.Read(ref _refCount) < 0;
internal override ConsumedCapability AsCapability()
{
var cap = Volatile.Read(ref _localCap);
if (cap == null)
{
Interlocked.CompareExchange(ref _localCap, new LocalCapability(this), null);
}
return Volatile.Read(ref _localCap)!;
}
}
}

View File

@ -102,19 +102,6 @@ namespace Capnp.Rpc
if (!_question.StateFlags.HasFlag(PendingQuestion.State.TailCall) && if (!_question.StateFlags.HasFlag(PendingQuestion.State.TailCall) &&
_question.StateFlags.HasFlag(PendingQuestion.State.Returned)) _question.StateFlags.HasFlag(PendingQuestion.State.Returned))
{ {
try
{
if (ResolvedCap == null)
{
throw new RpcException("Answer did not resolve to expected capability");
}
}
catch
{
args.Dispose();
throw;
}
return CallOnResolution(interfaceId, methodId, args); return CallOnResolution(interfaceId, methodId, args);
} }
else else
@ -198,8 +185,7 @@ namespace Capnp.Rpc
} }
else if (_question.IsTailCall) else if (_question.IsTailCall)
{ {
var vine = Vine.Create(this); uint id = endpoint.AllocateExport(AsSkeleton(), out bool first);
uint id = endpoint.AllocateExport(vine, out bool first);
writer.which = CapDescriptor.WHICH.SenderHosted; writer.which = CapDescriptor.WHICH.SenderHosted;
writer.SenderHosted = id; writer.SenderHosted = id;

View File

@ -36,7 +36,10 @@ namespace Capnp.Rpc
try try
{ {
if (resolvedCap is RemoteCapability || if (resolvedCap is NullCapability ||
// Must not request disembargo on null cap
resolvedCap is RemoteCapability ||
//# Note that in the case where Carol actually lives in Vat B (i.e., the same vat that the promise //# Note that in the case where Carol actually lives in Vat B (i.e., the same vat that the promise
//# already pointed at), no embargo is needed, because the pipelined calls are delivered over the //# already pointed at), no embargo is needed, because the pipelined calls are delivered over the
//# same path as the later direct calls. //# same path as the later direct calls.
@ -84,7 +87,7 @@ namespace Capnp.Rpc
cancellationTokenSource.Token.ThrowIfCancellationRequested(); cancellationTokenSource.Token.ThrowIfCancellationRequested();
} }
using var proxy = new Proxy(ResolvedCap); using var proxy = new Proxy(resolvedCap);
return proxy.Call(interfaceId, methodId, args, default); return proxy.Call(interfaceId, methodId, args, default);
}, TaskContinuationOptions.ExecuteSynchronously); }, TaskContinuationOptions.ExecuteSynchronously);

View File

@ -1,11 +1,12 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Capnp.Rpc namespace Capnp.Rpc
{ {
static class ResolvingCapabilityExtensions static class ResolvingCapabilityExtensions
{ {
public static async Task<ConsumedCapability?> Unwrap(this ConsumedCapability? cap) public static async Task<ConsumedCapability> Unwrap(this ConsumedCapability cap)
{ {
while (cap is IResolvingCapability resolving) while (cap is IResolvingCapability resolving)
{ {
@ -20,7 +21,7 @@ namespace Capnp.Rpc
public static Action? ExportAsSenderPromise<T>(this T cap, IRpcEndpoint endpoint, CapDescriptor.WRITER writer) public static Action? ExportAsSenderPromise<T>(this T cap, IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
where T: ConsumedCapability, IResolvingCapability where T: ConsumedCapability, IResolvingCapability
{ {
var vine = Vine.Create(cap); var vine = cap.AsSkeleton();
uint preliminaryId = endpoint.AllocateExport(vine, out bool first); uint preliminaryId = endpoint.AllocateExport(vine, out bool first);
writer.which = CapDescriptor.WHICH.SenderPromise; writer.which = CapDescriptor.WHICH.SenderPromise;
@ -58,7 +59,10 @@ namespace Capnp.Rpc
} }
catch (TaskCanceledException exception) catch (TaskCanceledException exception)
{ {
return new Proxy(LazyCapability.CreateCanceledCap(exception.CancellationToken)); var token = exception.CancellationToken;
if (!token.IsCancellationRequested)
token = new CancellationToken(true);
return new Proxy(LazyCapability.CreateCanceledCap(token));
} }
catch (System.Exception exception) catch (System.Exception exception)
{ {
@ -68,7 +72,7 @@ namespace Capnp.Rpc
switch (obj) switch (obj)
{ {
case Proxy proxy: return proxy; case Proxy proxy: return proxy;
case null: return new Proxy(null); case null: return new Proxy(NullCapability.Instance);
default: return BareProxy.FromImpl(obj); default: return BareProxy.FromImpl(obj);
} }
} }

View File

@ -310,7 +310,7 @@ namespace Capnp.Rpc
return AllocateExport(providedCapability, out first); return AllocateExport(providedCapability, out first);
} }
PendingQuestion AllocateQuestion(ConsumedCapability? target, SerializerState? inParams) PendingQuestion AllocateQuestion(ConsumedCapability target, SerializerState? inParams)
{ {
lock (_reentrancyBlocker) lock (_reentrancyBlocker)
{ {
@ -359,7 +359,7 @@ namespace Capnp.Rpc
if (bootstrapCap != null) if (bootstrapCap != null)
{ {
ret.which = Return.WHICH.Results; ret.which = Return.WHICH.Results;
bootstrap.SetCapability(bootstrap.ProvideCapability(LocalCapability.Create(bootstrapCap))); bootstrap.SetCapability(bootstrap.ProvideCapability(bootstrapCap.AsCapability()));
ret.Results!.Content = bootstrap; ret.Results!.Content = bootstrap;
bootstrapTask = Task.FromResult<AnswerOrCounterquestion>(bootstrap); bootstrapTask = Task.FromResult<AnswerOrCounterquestion>(bootstrap);
@ -896,8 +896,6 @@ namespace Capnp.Rpc
previousAnswer.Chain( previousAnswer.Chain(
disembargo.Target.PromisedAnswer, disembargo.Target.PromisedAnswer,
async t => async t =>
{
try
{ {
using var proxy = await t; using var proxy = await t;
@ -914,13 +912,6 @@ namespace Capnp.Rpc
throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me"); throw new RpcProtocolErrorException("'Disembargo': Answer does not resolve back to me");
} }
}
catch (System.Exception exception)
{
Logger.LogWarning($"Sender loopback request: Peer asked for disembargoing an answer which either has not yet returned, was canceled, or faulted: {exception.Message}");
throw new RpcProtocolErrorException($"'Disembargo' failure: {exception}");
}
}); });
} }
else else
@ -1177,7 +1168,7 @@ namespace Capnp.Rpc
mb.InitCapTable(); mb.InitCapTable();
var req = mb.BuildRoot<Message.WRITER>(); var req = mb.BuildRoot<Message.WRITER>();
req.which = Message.WHICH.Bootstrap; req.which = Message.WHICH.Bootstrap;
var pendingBootstrap = AllocateQuestion(null, null); var pendingBootstrap = AllocateQuestion(NullCapability.Instance, null);
req.Bootstrap!.QuestionId = pendingBootstrap.QuestionId; req.Bootstrap!.QuestionId = pendingBootstrap.QuestionId;
Tx(mb.Frame); Tx(mb.Frame);
@ -1315,7 +1306,7 @@ namespace Capnp.Rpc
case CapDescriptor.WHICH.ReceiverHosted: case CapDescriptor.WHICH.ReceiverHosted:
if (_exportTable.TryGetValue(capDesc.ReceiverHosted, out var rc)) if (_exportTable.TryGetValue(capDesc.ReceiverHosted, out var rc))
{ {
return LocalCapability.Create(rc.Cap); return rc.Cap.AsCapability();
} }
else else
{ {
@ -1370,6 +1361,9 @@ namespace Capnp.Rpc
return newCap; return newCap;
} }
case CapDescriptor.WHICH.None:
return NullCapability.Instance;
default: default:
Logger.LogWarning("Unknown capability descriptor category"); Logger.LogWarning("Unknown capability descriptor category");
throw new RpcUnimplementedException(); throw new RpcUnimplementedException();

View File

@ -26,10 +26,6 @@ namespace Capnp.Rpc
} }
} }
#if DEBUG_DISPOSE
const int NoDisposeFlag = 0x4000000;
#endif
static readonly ConditionalWeakTable<object, Skeleton> _implMap = static readonly ConditionalWeakTable<object, Skeleton> _implMap =
new ConditionalWeakTable<object, Skeleton>(); new ConditionalWeakTable<object, Skeleton>();
@ -63,31 +59,6 @@ namespace Capnp.Rpc
return new SkeletonRelinquisher(GetOrCreateSkeleton(impl, true)); return new SkeletonRelinquisher(GetOrCreateSkeleton(impl, true));
} }
#if DEBUG_DISPOSE
/// <summary>
/// This DEBUG-only diagnostic method states that the Skeleton corresponding to a given capability is not expected to
/// be disposed until the next call to EndAssertNotDisposed().
/// </summary>
/// <typeparam name="T">Capability interface</typeparam>
/// <param name="impl">Capability implementation</param>
public static void BeginAssertNotDisposed<T>(T impl) where T : class
{
GetOrCreateSkeleton(impl, false).BeginAssertNotDisposed();
}
/// <summary>
/// This DEBUG-only diagnostic method ends a non-disposal period started with BeginAssertNotDisposed.
/// </summary>
/// <typeparam name="T">Capability interface</typeparam>
/// <param name="impl">Capability implementation</param>
public static void EndAssertNotDisposed<T>(T impl) where T : class
{
GetOrCreateSkeleton(impl, false).EndAssertNotDisposed();
}
#endif
int _refCount = 0;
/// <summary> /// <summary>
/// Calls an interface method of this capability. /// Calls an interface method of this capability.
/// </summary> /// </summary>
@ -98,43 +69,8 @@ namespace Capnp.Rpc
/// <returns>A Task which will resolve to the call result</returns> /// <returns>A Task which will resolve to the call result</returns>
public abstract Task<AnswerOrCounterquestion> Invoke(ulong interfaceId, ushort methodId, DeserializerState args, CancellationToken cancellationToken = default); public abstract Task<AnswerOrCounterquestion> Invoke(ulong interfaceId, ushort methodId, DeserializerState args, CancellationToken cancellationToken = default);
internal void Claim() internal abstract void Claim();
{ internal abstract void Relinquish();
Interlocked.Increment(ref _refCount);
}
#if DEBUG_DISPOSE
internal void BeginAssertNotDisposed()
{
if ((Interlocked.Add(ref _refCount, NoDisposeFlag) & NoDisposeFlag) == 0)
{
throw new InvalidOperationException("Flag already set. State is now broken.");
}
}
internal void EndAssertNotDisposed()
{
if ((Interlocked.Add(ref _refCount, -NoDisposeFlag) & NoDisposeFlag) != 0)
{
throw new InvalidOperationException("Flag already cleared. State is now broken.");
}
}
#endif
internal void Relinquish()
{
int count = Interlocked.Decrement(ref _refCount);
if (0 == count)
{
#if DEBUG_DISPOSE
if ((count & NoDisposeFlag) != 0)
throw new InvalidOperationException("Unexpected Skeleton disposal");
#endif
Dispose(true);
GC.SuppressFinalize(this);
}
}
internal void Relinquish(int count) internal void Relinquish(int count)
{ {
@ -145,41 +81,21 @@ namespace Capnp.Rpc
Relinquish(); Relinquish();
} }
/// <summary>
/// Dispose pattern implementation
/// </summary>
protected virtual void Dispose(bool disposing)
{
}
/// <summary>
/// Finalizer
/// </summary>
~Skeleton()
{
Dispose(false);
}
internal virtual void Bind(object impl) internal virtual void Bind(object impl)
{ {
throw new NotSupportedException(); throw new NotSupportedException("Cannot bind");
} }
internal abstract ConsumedCapability AsCapability();
} }
/// <summary> /// <summary>
/// Skeleton for a specific capability interface. /// Skeleton for a specific capability interface.
/// </summary> /// </summary>
/// <typeparam name="T">Capability interface</typeparam> /// <typeparam name="T">Capability interface</typeparam>
public abstract class Skeleton<T> : Skeleton, IMonoSkeleton public abstract class Skeleton<T> : RefCountingSkeleton, IMonoSkeleton
{ {
#if DebugEmbargos
ILogger Logger { get; } = Logging.CreateLogger<Skeleton<T>>();
#endif
Func<DeserializerState, CancellationToken, Task<AnswerOrCounterquestion>>[] _methods = null!; Func<DeserializerState, CancellationToken, Task<AnswerOrCounterquestion>>[] _methods = null!;
CancellationTokenSource? _disposed = new CancellationTokenSource();
readonly object _reentrancyBlocker = new object();
int _pendingCalls;
/// <summary> /// <summary>
/// Constructs an instance. /// Constructs an instance.
@ -225,45 +141,7 @@ namespace Capnp.Rpc
if (methodId >= _methods.Length) if (methodId >= _methods.Length)
throw new NotImplementedException("Wrong method id"); throw new NotImplementedException("Wrong method id");
lock (_reentrancyBlocker) return await _methods[methodId](args, cancellationToken);
{
if (_disposed == null || _disposed.IsCancellationRequested)
{
throw new ObjectDisposedException(nameof(Skeleton<T>));
}
++_pendingCalls;
}
var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(_disposed.Token, cancellationToken);
try
{
return await _methods[methodId](args, linkedSource.Token);
}
catch (System.Exception)
{
throw;
}
finally
{
lock (_reentrancyBlocker)
{
--_pendingCalls;
}
linkedSource.Dispose();
CheckCtsDisposal();
}
}
void CheckCtsDisposal()
{
if (_pendingCalls == 0 && _disposed != null && _disposed.IsCancellationRequested)
{
_disposed.Dispose();
_disposed = null;
}
} }
/// <summary> /// <summary>
@ -271,16 +149,6 @@ namespace Capnp.Rpc
/// </summary> /// </summary>
protected override void Dispose(bool disposing) protected override void Dispose(bool disposing)
{ {
lock (_reentrancyBlocker)
{
if (_disposed == null || _disposed.IsCancellationRequested)
return;
_disposed.Cancel();
CheckCtsDisposal();
}
if (disposing && Impl is IDisposable disposable) if (disposing && Impl is IDisposable disposable)
{ {
disposable.Dispose(); disposable.Dispose();

View File

@ -5,66 +5,37 @@ using System.Threading.Tasks;
namespace Capnp.Rpc namespace Capnp.Rpc
{ {
class Vine : Skeleton class Vine : Skeleton
{ {
public static Skeleton Create(ConsumedCapability? cap) public Vine(ConsumedCapability consumedCap)
{ {
if (cap is LocalCapability lcap) Cap = consumedCap;
return lcap.ProvidedCap;
else
return new Vine(cap);
} }
Vine(ConsumedCapability? consumedCap) public ConsumedCapability Cap { get; }
{
Proxy = new Proxy(consumedCap);
#if DebugFinalizers internal override ConsumedCapability AsCapability() => Cap;
CreatorStackTrace = Environment.StackTrace;
#endif internal override void Claim()
{
Cap.AddRef();
} }
#if DebugFinalizers internal override void Relinquish()
~Vine()
{ {
Logger.LogWarning($"Caught orphaned Vine, created from here: {CreatorStackTrace}."); Cap.Release();
Dispose(false);
} }
ILogger Logger { get; } = Logging.CreateLogger<Vine>();
string CreatorStackTrace { get; }
#endif
internal override void Bind(object impl)
{
throw new NotImplementedException();
}
public Proxy Proxy { get; }
public async override Task<AnswerOrCounterquestion> Invoke( public async override Task<AnswerOrCounterquestion> Invoke(
ulong interfaceId, ushort methodId, DeserializerState args, ulong interfaceId, ushort methodId, DeserializerState args,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var promisedAnswer = Proxy.Call(interfaceId, methodId, (DynamicSerializerState)args, default); using var proxy = new Proxy(Cap);
var promisedAnswer = proxy.Call(interfaceId, methodId, (DynamicSerializerState)args, false, cancellationToken);
if (promisedAnswer is PendingQuestion pendingQuestion && pendingQuestion.RpcEndpoint == Impatient.AskingEndpoint) if (promisedAnswer is PendingQuestion pendingQuestion && pendingQuestion.RpcEndpoint == Impatient.AskingEndpoint)
{ {
async void SetupCancellation()
{
try
{
using var registration = cancellationToken.Register(promisedAnswer.Dispose);
await promisedAnswer.WhenReturned;
}
catch
{
}
}
SetupCancellation();
return pendingQuestion; return pendingQuestion;
} }
else else
@ -73,16 +44,5 @@ namespace Capnp.Rpc
return (DynamicSerializerState)await promisedAnswer.WhenReturned; return (DynamicSerializerState)await promisedAnswer.WhenReturned;
} }
} }
protected override void Dispose(bool disposing)
{
if (disposing)
Proxy.Dispose();
else
try { Proxy.Dispose(); }
catch { }
base.Dispose(disposing);
}
} }
} }

View File

@ -237,33 +237,31 @@ namespace Capnp
} }
} }
internal Rpc.ConsumedCapability? DecodeCapPointer(int offset) internal Rpc.ConsumedCapability DecodeCapPointer(int offset)
{ {
if (Caps == null) if (Caps == null)
throw new InvalidOperationException("Capbility table not set"); throw new InvalidOperationException("Capbility table not set");
if (!IsAllocated) if (!IsAllocated)
{ {
return null; return Rpc.NullCapability.Instance;
} }
WirePointer pointer = RawData[offset]; WirePointer pointer = RawData[offset];
if (pointer.IsNull) if (pointer.IsNull)
{ {
return null; return Rpc.NullCapability.Instance;
} }
if (pointer.Kind != PointerKind.Other) if (pointer.Kind != PointerKind.Other)
{ {
throw new Rpc.RpcException( throw new Rpc.RpcException("Expected a capability pointer, but got something different");
"Expected a capability pointer, but got something different");
} }
if (pointer.CapabilityIndex >= Caps.Count) if (pointer.CapabilityIndex >= Caps.Count)
{ {
throw new Rpc.RpcException( throw new Rpc.RpcException("Capability index out of range");
"Capability index out of range");
} }
return Caps[(int)pointer.CapabilityIndex]; return Caps[(int)pointer.CapabilityIndex];
@ -1237,9 +1235,9 @@ namespace Capnp
/// <param name="capability">The low-level capability object to provide.</param> /// <param name="capability">The low-level capability object to provide.</param>
/// <returns>Index of the given capability in the capability table, null if capability is null</returns> /// <returns>Index of the given capability in the capability table, null if capability is null</returns>
/// <exception cref="InvalidOperationException">The underlying message builder was not configured for capability table support.</exception> /// <exception cref="InvalidOperationException">The underlying message builder was not configured for capability table support.</exception>
public uint? ProvideCapability(Rpc.ConsumedCapability? capability) public uint? ProvideCapability(Rpc.ConsumedCapability capability)
{ {
if (capability == null) if (capability == null || capability == Rpc.NullCapability.Instance)
return null; return null;
if (Caps == null) if (Caps == null)
@ -1251,7 +1249,7 @@ namespace Capnp
{ {
index = Caps.Count; index = Caps.Count;
Caps.Add(capability); Caps.Add(capability);
capability?.AddRef(); capability.AddRef();
} }
return (uint)index; return (uint)index;
@ -1263,9 +1261,9 @@ namespace Capnp
/// <param name="capability">The capability to provide, in terms of its skeleton.</param> /// <param name="capability">The capability to provide, in terms of its skeleton.</param>
/// <returns>Index of the given capability in the capability table</returns> /// <returns>Index of the given capability in the capability table</returns>
/// <exception cref="InvalidOperationException">The underlying message builder was not configured for capability table support.</exception> /// <exception cref="InvalidOperationException">The underlying message builder was not configured for capability table support.</exception>
public uint ProvideCapability(Rpc.Skeleton capability) public uint? ProvideCapability(Rpc.Skeleton capability)
{ {
return ProvideCapability(Rpc.LocalCapability.Create(capability))!.Value; return ProvideCapability(capability.AsCapability());
} }
/// <summary> /// <summary>
@ -1356,7 +1354,7 @@ namespace Capnp
} }
} }
internal Rpc.ConsumedCapability? StructReadRawCap(int index) internal Rpc.ConsumedCapability StructReadRawCap(int index)
{ {
if (Kind != ObjectKind.Struct && Kind != ObjectKind.Nil) if (Kind != ObjectKind.Struct && Kind != ObjectKind.Nil)
throw new InvalidOperationException("Allowed on structs only"); throw new InvalidOperationException("Allowed on structs only");
@ -1376,10 +1374,10 @@ namespace Capnp
/// <exception cref="ArgumentOutOfRangeException"><paramref name="slot"/> is out of range.</exception> /// <exception cref="ArgumentOutOfRangeException"><paramref name="slot"/> is out of range.</exception>
/// <exception cref="ArgumentException">The desired interface does not qualify as capability interface (<see cref="Rpc.ProxyAttribute"/>)</exception> /// <exception cref="ArgumentException">The desired interface does not qualify as capability interface (<see cref="Rpc.ProxyAttribute"/>)</exception>
/// <exception cref="InvalidOperationException">This state does not represent a struct.</exception> /// <exception cref="InvalidOperationException">This state does not represent a struct.</exception>
public T? ReadCap<T>(int slot) where T : class public T ReadCap<T>(int slot) where T : class
{ {
var cap = StructReadRawCap(slot); var cap = StructReadRawCap(slot);
return Rpc.CapabilityReflection.CreateProxy<T>(cap) as T; return (Rpc.CapabilityReflection.CreateProxy<T>(cap) as T)!;
} }
/// <summary> /// <summary>