using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
///
/// Provides support for promise pipelining.
///
public static class Impatient
{
static readonly ConditionalWeakTable _taskTable = new ConditionalWeakTable();
static readonly ThreadLocal _askingEndpoint = new ThreadLocal();
///
/// Attaches a continuation to the given promise and registers the resulting task for pipelining.
///
/// Task result type
/// The promise
/// The continuation
/// Task representing the future answer
/// or is null.
/// The pomise was already registered.
public static Task MakePipelineAware(IPromisedAnswer promise, Func then)
{
async Task AwaitAnswer()
{
return then(await promise.WhenReturned);
}
var rtask = AwaitAnswer();
_taskTable.Add(rtask, promise);
return rtask;
}
///
/// Looks up the underlying promise which was previously registered for the given Task using MakePipelineAware.
///
///
/// The underlying promise
/// is null.
/// The task was not registered using MakePipelineAware.
public static IPromisedAnswer GetAnswer(Task task)
{
if (!_taskTable.TryGetValue(task, out var answer))
{
throw new ArgumentException("Unknown task");
}
return answer;
}
internal static IPromisedAnswer TryGetAnswer(Task task)
{
_taskTable.TryGetValue(task, out var answer);
return answer;
}
static async Task AwaitProxy(Task task) where T: class
{
var item = await task;
switch (item)
{
case Proxy proxy:
return proxy;
case null:
return null;
}
var skel = Skeleton.GetOrCreateSkeleton(item, false);
var localCap = LocalCapability.Create(skel);
return CapabilityReflection.CreateProxy(localCap);
}
///
/// Returns a local "lazy" proxy for a given Task.
/// This is not real promise pipelining and will probably be removed.
///
/// Capability interface type
/// The task
/// A proxy for the given task.
/// is null.
/// did not
/// quality as capability interface.
public static TInterface PseudoEager(this Task task,
[System.Runtime.CompilerServices.CallerMemberName] string memberName = "",
[System.Runtime.CompilerServices.CallerFilePath] string sourceFilePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int sourceLineNumber = 0)
where TInterface : class
{
var lazyCap = new LazyCapability(AwaitProxy(task));
return CapabilityReflection.CreateProxy(lazyCap, memberName, sourceFilePath, sourceLineNumber) as TInterface;
}
internal static IRpcEndpoint AskingEndpoint
{
get => _askingEndpoint.Value;
set { _askingEndpoint.Value = value; }
}
public static async Task MaybeTailCall(Task task, Func func)
{
if (TryGetAnswer(task) is PendingQuestion pendingQuestion &&
pendingQuestion.RpcEndpoint == AskingEndpoint)
{
pendingQuestion.IsTailCall = true;
return pendingQuestion;
}
else
{
return func(await task);
}
}
public static Task MaybeTailCall(Task<(T1, T2)> task, Func func)
{
return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2));
}
public static Task MaybeTailCall(Task<(T1, T2, T3)> task, Func func)
{
return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3));
}
public static Task MaybeTailCall(Task<(T1, T2, T3, T4)> task, Func func)
{
return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4));
}
public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5)> task, Func func)
{
return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5));
}
public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5, T6)> task, Func func)
{
return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6));
}
public static Task MaybeTailCall(Task<(T1, T2, T3, T4, T5, T6, T7)> task, Func func)
{
return MaybeTailCall(task, (ValueTuple t) => func(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6, t.Item7));
}
}
}