using System; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace Capnp.Rpc { /// /// Provides support for promise pipelining. /// public static class Impatient { static readonly ConditionalWeakTable _taskTable = new ConditionalWeakTable(); static readonly ThreadLocal _askingEndpoint = new ThreadLocal(); /// /// Attaches a continuation to the given promise and registers the resulting task for pipelining. /// /// Task result type /// The promise /// The continuation /// Task representing the future answer /// or is null. /// The pomise was already registered. public static Task MakePipelineAware(IPromisedAnswer promise, Func then) { async Task AwaitAnswer() { return then(await promise.WhenReturned); } var rtask = AwaitAnswer(); try { // Really weird: We'd expect AwaitAnswer() to initialize a new Task instance upon each invocation. // However, this does not seem to be always true (as indicated by CI test suite). An explanation might be // that the underlying implementation recycles Task instances (um, really? doesn't make sense. But the // observation doesn't make sense, either). _taskTable.Add(rtask, promise); } catch (ArgumentException) { if (rtask.IsCompleted) { // Force .NET to create a new Task instance if (rtask.IsCanceled) { rtask = Task.FromCanceled(new CancellationToken(true)); } else if (rtask.IsFaulted) { rtask = Task.FromException(rtask.Exception!.InnerException!); } else { rtask = Task.FromResult(rtask.Result); } _taskTable.Add(rtask, promise); } else { throw new InvalidOperationException("What the heck is wrong with Task?"); } } return rtask; } /// /// Looks up the underlying promise which was previously registered for the given Task using MakePipelineAware. /// /// /// The underlying promise /// is null. /// The task was not registered using MakePipelineAware. public static IPromisedAnswer GetAnswer(Task task) { if (!_taskTable.TryGetValue(task, out var answer)) { throw new ArgumentException("Unknown task"); } return answer; } internal static IPromisedAnswer? TryGetAnswer(Task task) { _taskTable.TryGetValue(task, out var answer); return answer; } static async Task AwaitProxy(Task task) where T: class { var item = await task; switch (item) { case Proxy proxy: return proxy; case null: return CapabilityReflection.CreateProxy(null); } var skel = Skeleton.GetOrCreateSkeleton(item!, false); var localCap = LocalCapability.Create(skel); return CapabilityReflection.CreateProxy(localCap); } /// /// Returns a local "lazy" proxy for a given Task. /// This is not real promise pipelining and will probably be removed. /// /// Capability interface type /// The task /// debugging aid /// debugging aid /// debugging aid /// A proxy for the given task. /// is null. /// did not /// quality as capability interface. [Obsolete("Call Eager(task, true) instead")] public static TInterface PseudoEager(this Task task, [System.Runtime.CompilerServices.CallerMemberName] string memberName = "", [System.Runtime.CompilerServices.CallerFilePath] string sourceFilePath = "", [System.Runtime.CompilerServices.CallerLineNumber] int sourceLineNumber = 0) where TInterface : class { var lazyCap = new LazyCapability(AwaitProxy(task)); return (CapabilityReflection.CreateProxy(lazyCap, memberName, sourceFilePath, sourceLineNumber) as TInterface)!; } static readonly MemberAccessPath Path_OneAndOnly = new MemberAccessPath(0U); /// /// Returns a promise-pipelined Proxy for a remote method invocation Task. /// /// Capability interface type /// Task returning an interface /// If this flag is 'false', the MUST have been returned from a remote /// method invocation on a generated Proxy interface. Since this is the prerequisite for promise pipelining to work, the method throws an /// exception if the requirement is not met (i.e. the passed some Task instance was constructed "somewhere else"). Setting this flag to 'true' /// prevents such an exception. The method falls back to a local "lazy" proxy for the given Task. It is fully usable, but does not perform /// any promise pipelining (as specified for Cap'n Proto). /// A proxy for the given future. /// is null. /// did not qualify as capability interface. /// The task was not returned from a remote method invocation. Promise pipelining won't work. /// Setting > to 'true' prevents this exception. /// OR: Mismatch between generic type arguments (if capability interface is generic). /// Mismatch between generic type arguments (if capability interface is generic). /// Problem with instatiating the Proxy (constructor threw exception). /// Caller does not have permission to invoke the Proxy constructor. /// Problem with building the Proxy type, or problem with loading some dependent class. public static TInterface Eager(this Task task, bool allowNoPipeliningFallback = false) where TInterface : class { var answer = TryGetAnswer(task); if (answer == null) { if (!allowNoPipeliningFallback) { throw new ArgumentException("The task was not returned from a remote method invocation. See documentation for details."); } var lazyCap = new LazyCapability(AwaitProxy(task)); return (CapabilityReflection.CreateProxy(lazyCap) as TInterface)!; } else { return (CapabilityReflection.CreateProxy(answer.Access(Path_OneAndOnly)) as TInterface)!; } } internal static IRpcEndpoint? AskingEndpoint { get => _askingEndpoint.Value; set { _askingEndpoint.Value = value; } } /// /// Checks whether a given task belongs to a pending RPC and requests a tail call if applicable. /// /// Task result type /// Task to request /// Converts the task's result to a SerializerState /// Tail-call aware task public static async Task MaybeTailCall(Task task, Func func) { if (TryGetAnswer(task) is PendingQuestion pendingQuestion && pendingQuestion.RpcEndpoint == AskingEndpoint) { pendingQuestion.IsTailCall = true; return pendingQuestion; } else { return func(await task); } } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5, T6)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6)); } /// /// Overload for tuple-typed tasks /// public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5, T6, T7)> task, Func func) { return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6, t.Item7)); } } }