using System; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace Capnp.Rpc { /// /// Provides support for promise pipelining. /// public static class Impatient { static readonly ConditionalWeakTable _taskTable = new ConditionalWeakTable(); static readonly ThreadLocal> _askingEndpoint = new ThreadLocal>(() => new Stack()); /// /// Attaches a continuation to the given promise and registers the resulting task for pipelining. /// /// Task result type /// The promise /// The continuation /// Task representing the future answer /// or is null. /// The pomise was already registered. public static Task MakePipelineAware(IPromisedAnswer promise, Func then) { async Task AwaitAnswer() { var result = await promise.WhenReturned; if (promise.IsTailCall) throw new TailCallNoDataException(); return then(result); } var rtask = AwaitAnswer(); // Rare situation: .NET maintains a cache of some pre-computed tasks for standard results (such as (int)0, (object)null). // AwaitAnswer() might indeed have chosen a fast-path optimization, such that rtask is a cached object instead of a new instance. // Once this happens the second time, and we return the same rtask for a different promise. GetAnswer()/TryGetAnswer() may return the "wrong" // promise! Fortunately, this does not really matter, since the "wrong" promise is guaranteed to return exactly the same answer. :-) _taskTable.GetValue(rtask, _ => promise); return rtask; } /// /// Looks up the underlying promise which was previously registered for the given Task using MakePipelineAware. /// /// /// The underlying promise /// is null. /// The task was not registered using MakePipelineAware. [Obsolete("Please re-generate capnp code-behind. GetAnswer(task).Access(...) was replaced by Access(task, ...)")] public static IPromisedAnswer GetAnswer(Task task) { if (!_taskTable.TryGetValue(task, out var answer)) { throw new ArgumentException("Unknown task"); } return answer; } internal static IPromisedAnswer? TryGetAnswer(Task task) { _taskTable.TryGetValue(task, out var answer); return answer; } /// /// Returns a promise-pipelined capability for a remote method invocation Task. /// /// remote method invocation task /// path to the desired capability /// task returning a proxy to the desired capability /// Pipelined low-level capability public static ConsumedCapability Access(Task task, MemberAccessPath access, Task proxyTask) { var answer = TryGetAnswer(task); if (answer != null) return answer.Access(access, proxyTask); return new LazyCapability(proxyTask.AsProxyTask()); } /// /// Returns a local "lazy" proxy for a given Task. /// This is not real promise pipelining and will probably be removed. /// /// Capability interface type /// The task /// A proxy for the given task. /// is null. /// did not /// quality as capability interface. [Obsolete("Call Eager(task, true) instead")] public static TInterface PseudoEager(this Task task) where TInterface : class, IDisposable { var lazyCap = new LazyCapability(task.AsProxyTask()); return (CapabilityReflection.CreateProxy(lazyCap) as TInterface)!; } static readonly MemberAccessPath Path_OneAndOnly = new MemberAccessPath(0U); /// /// Returns a promise-pipelined Proxy for a remote method invocation Task. /// /// Capability interface type /// Task returning an interface /// If this flag is 'false', the MUST have been returned from a remote /// method invocation on a generated Proxy interface. Since this is the prerequisite for promise pipelining to work, the method throws an /// exception if the requirement is not met (i.e. the passed some Task instance was constructed "somewhere else"). Setting this flag to 'true' /// prevents such an exception. The method falls back to a local "lazy" proxy for the given Task. It is fully usable, but does not perform /// any promise pipelining (as specified for Cap'n Proto). /// A proxy for the given future. /// is null. /// did not qualify as capability interface. /// The task was not returned from a remote method invocation. Promise pipelining won't work. /// Setting > to 'true' prevents this exception. /// OR: Mismatch between generic type arguments (if capability interface is generic). /// Mismatch between generic type arguments (if capability interface is generic). /// Problem with instatiating the Proxy (constructor threw exception). /// Caller does not have permission to invoke the Proxy constructor. /// Problem with building the Proxy type, or problem with loading some dependent class. public static TInterface Eager(this Task task, bool allowNoPipeliningFallback = false) where TInterface : class, IDisposable { var answer = TryGetAnswer(task); if (answer == null) { if (!allowNoPipeliningFallback) { throw new ArgumentException("The task was not returned from a remote method invocation. See documentation for details."); } var proxyTask = task.AsProxyTask(); if (proxyTask.ReplacementTaskIsCompletedSuccessfully()) { return proxyTask.Result.Cast(true); } else { var lazyCap = new LazyCapability(proxyTask); return (CapabilityReflection.CreateProxy(lazyCap) as TInterface)!; } } else { async Task AsDisposableTask() { return await task; } return (CapabilityReflection.CreateProxy(answer.Access(Path_OneAndOnly, AsDisposableTask())) as TInterface)!; } } /// /// Unwraps given capability. Unwrapping walks the chain of promised capabilities and awaits their resolutions, /// until we get the finally resolved capability. If it is the capability, the method returns a null reference. /// If the capability is broken (resolved to exception, dependent answer faulted or cancelled, RPC endpoint closed), /// it throws an exception. /// /// Capability interface /// capability to unwrap /// Task returning the eventually resolved capability /// Capability is broken public static async Task Unwrap(this TInterface cap) where TInterface: class, IDisposable { using var proxy = cap as Proxy; if (proxy == null) return cap; var unwrapped = await proxy.ConsumedCap.Unwrap(); if (unwrapped == null || unwrapped == NullCapability.Instance) return null; return ((CapabilityReflection.CreateProxy(unwrapped)) as TInterface)!; } internal static IRpcEndpoint? AskingEndpoint { get => _askingEndpoint.Value!.Count > 0 ? _askingEndpoint.Value.Peek() : null; } internal static void PushAskingEndpoint(IRpcEndpoint endpoint) { _askingEndpoint.Value!.Push(endpoint); } internal static void PopAskingEndpoint() { _askingEndpoint.Value!.Pop(); } /// /// Checks whether a given task belongs to a pending RPC and requests a tail call if applicable. /// /// Task result type /// Task to request /// Converts the task's result to a SerializerState /// Tail-call aware task public static async Task MaybeTailCall(Task task, Func func) { if (TryGetAnswer(task) is PendingQuestion pendingQuestion && pendingQuestion.RpcEndpoint == AskingEndpoint) { pendingQuestion.IsTailCall = true; return pendingQuestion; } else { return func(await task); } } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5, T6)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5, T6, T7)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6, t.Item7)); } } }