using Capnp.Util; using System; using System.Threading; using System.Threading.Tasks; namespace Capnp.Rpc { class LazyCapability : RefCountingCapability, IResolvingCapability { public static LazyCapability CreateBrokenCap(string message) { return new LazyCapability(Task.FromException(new RpcException(message))); } public static LazyCapability CreateCanceledCap(CancellationToken token) { return new LazyCapability(Task.FromCanceled(token)); } readonly StrictlyOrderedAwaitTask? _proxyTask; readonly StrictlyOrderedAwaitTask _capTask; public LazyCapability(Task capabilityTask) { _capTask = capabilityTask.EnforceAwaitOrder(); } public LazyCapability(Task proxyTask) { _proxyTask = proxyTask.EnforceAwaitOrder(); async Task AwaitCap() => (await _proxyTask!).ConsumedCap; _capTask = AwaitCap().EnforceAwaitOrder(); } internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer) { if (WhenResolved.IsCompleted && WhenResolved.WrappedTask.ReplacementTaskIsCompletedSuccessfully()) { using var proxy = GetResolvedCapability()!; return proxy.Export(endpoint, writer); } else { return this.ExportAsSenderPromise(endpoint, writer); } } protected override void ReleaseRemotely() { if (_proxyTask != null) { async void DisposeProxyWhenResolved() { try { using var _ = await _proxyTask!; } catch { } } DisposeProxyWhenResolved(); } } public StrictlyOrderedAwaitTask WhenResolved => _capTask; public T? GetResolvedCapability() where T: class { if (_capTask.WrappedTask.IsCompleted) { try { return (CapabilityReflection.CreateProxy(_capTask.Result) as T)!; } catch (AggregateException exception) { throw exception.InnerException!; } } else { return null; } } async Task CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken) { ConsumedCapability cap; try { cap = await _capTask; } catch { args.Dispose(); throw; } if (cancellationToken.IsCancellationRequested) { args.Dispose(); cancellationToken.ThrowIfCancellationRequested(); } using var proxy = new Proxy(cap); var call = proxy.Call(interfaceId, methodId, args, default); var whenReturned = call.WhenReturned; using (var registration = cancellationToken.Register(call.Dispose)) { return await whenReturned; } } internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args) { var cts = new CancellationTokenSource(); return new LocalAnswer(cts, CallImpl(interfaceId, methodId, args, cts.Token)); } } }