using System; using System.Diagnostics; using System.Threading.Tasks; namespace Capnp.Rpc { /// /// A promised answer due to RPC. /// /// /// Disposing the instance before the answer is available results in a best effort attempt to cancel /// the ongoing call. /// public sealed class PendingQuestion: IPromisedAnswer { /// /// Question lifetime management state /// [Flags] public enum State { /// /// The question has not yet been sent. /// None = 0, /// /// Tail call flag /// TailCall = 1, /// /// The question has been sent. /// Sent = 2, /// /// The question has been answered. /// Returned = 4, /// /// A 'finish' request was sent to the peer, indicating that no further requests will refer /// to this question. /// FinishRequested = 8, /// /// Question object was disposed. /// Disposed = 16, /// /// Question object was finalized by GC. /// This flag should only be observable when debugging the finalizer itself. /// Finalized = 32 } readonly TaskCompletionSource _tcs = new TaskCompletionSource(); readonly uint _questionId; ConsumedCapability? _target; SerializerState? _inParams; int _inhibitFinishCounter; internal PendingQuestion(IRpcEndpoint ep, uint id, ConsumedCapability? target, SerializerState? inParams) { RpcEndpoint = ep ?? throw new ArgumentNullException(nameof(ep)); _questionId = id; _target = target; _inParams = inParams; StateFlags = inParams == null ? State.Sent : State.None; if (inParams != null) { foreach (var cap in inParams.Caps!) { cap?.AddRef(); } } if (target != null) { target.AddRef(); } } internal IRpcEndpoint RpcEndpoint { get; } internal object ReentrancyBlocker { get; } = new object(); internal uint QuestionId => _questionId; internal State StateFlags { get; private set; } /// /// Eventually returns the server answer /// public Task WhenReturned => _tcs.Task; internal bool IsTailCall { get => StateFlags.HasFlag(State.TailCall); set { if (value) StateFlags |= State.TailCall; else StateFlags &= ~State.TailCall; } } internal bool IsReturned => StateFlags.HasFlag(State.Returned); internal void DisallowFinish() { ++_inhibitFinishCounter; } internal void AllowFinish() { --_inhibitFinishCounter; AutoFinish(); } const string ReturnDespiteTailCallMessage = "Peer sent actual results despite the question was sent as tail call. This was not expected and is a protocol error."; internal void OnReturn(DeserializerState results) { lock (ReentrancyBlocker) { SetReturned(); } if (StateFlags.HasFlag(State.TailCall)) { _tcs.TrySetException(new RpcException(ReturnDespiteTailCallMessage)); } else { if (!_tcs.TrySetResult(results)) { ReleaseOutCaps(results); } } } internal void OnTailCallReturn() { lock (ReentrancyBlocker) { SetReturned(); } if (!StateFlags.HasFlag(State.TailCall)) { _tcs.TrySetException(new RpcException("Peer sent the results of this questions somewhere else. This was not expected and is a protocol error.")); } else { _tcs.TrySetResult(default); } } internal void OnException(Exception.READER exception) { lock (ReentrancyBlocker) { SetReturned(); } _tcs.TrySetException(new RpcException(exception.Reason)); } internal void OnException(System.Exception exception) { lock (ReentrancyBlocker) { SetReturned(); } _tcs.TrySetException(exception); } internal void OnCanceled() { lock (ReentrancyBlocker) { SetReturned(); } _tcs.TrySetCanceled(); } void DeleteMyQuestion() { RpcEndpoint.DeleteQuestion(this); } internal void RequestFinish() { RpcEndpoint.Finish(_questionId); } void AutoFinish() { if (StateFlags.HasFlag(State.FinishRequested)) { return; } if ((_inhibitFinishCounter == 0 && StateFlags.HasFlag(State.Returned) && !StateFlags.HasFlag(State.TailCall)) || StateFlags.HasFlag(State.Disposed)) { StateFlags |= State.FinishRequested; RequestFinish(); } } void SetReturned() { if (StateFlags.HasFlag(State.Returned)) { throw new InvalidOperationException("Return state already set"); } StateFlags |= State.Returned; AutoFinish(); DeleteMyQuestion(); } /// /// Refer to a (possibly nested) member of this question's (possibly future) result and return /// it as a capability. /// /// Access path /// Low-level capability /// The referenced member does not exist or does not resolve to a capability pointer. public ConsumedCapability? Access(MemberAccessPath access) { lock (ReentrancyBlocker) { if ( StateFlags.HasFlag(State.Returned) && !StateFlags.HasFlag(State.TailCall)) { try { return access.Eval(WhenReturned.Result); } catch (AggregateException exception) { throw exception.InnerException!; } } else { return new RemoteAnswerCapability(this, access); } } } /// /// Refer to a (possibly nested) member of this question's (possibly future) result and return /// it as a capability. /// /// Access path /// promises the cap whose ownership is transferred to this object /// Low-level capability /// The referenced member does not exist or does not resolve to a capability pointer. public ConsumedCapability? Access(MemberAccessPath access, Task task) { var proxyTask = task.AsProxyTask(); return new RemoteAnswerCapability(this, access, proxyTask); } static void ReleaseCaps(ConsumedCapability? target, SerializerState? inParams) { if (inParams != null) { foreach (var cap in inParams.Caps!) { cap?.Release(false); } } if (target != null) { target.Release(false); } } static void ReleaseOutCaps(DeserializerState outParams) { foreach (var cap in outParams.Caps!) { cap?.Release(false); } } internal void Send() { SerializerState? inParams; ConsumedCapability? target; lock (ReentrancyBlocker) { if (StateFlags.HasFlag(State.Sent)) throw new InvalidOperationException("Already sent"); inParams = _inParams; _inParams = null; target = _target; _target = null; StateFlags |= State.Sent; } var msg = (Message.WRITER)inParams!.MsgBuilder!.Root!; Debug.Assert(msg.Call.Target.which != MessageTarget.WHICH.undefined); var call = msg.Call; call.QuestionId = QuestionId; call.SendResultsTo.which = IsTailCall ? Call.sendResultsTo.WHICH.Yourself : Call.sendResultsTo.WHICH.Caller; try { RpcEndpoint.SendQuestion(inParams, call.Params); } catch (System.Exception exception) { OnException(exception); } ReleaseCaps(target!, inParams); } #region IDisposable Support void Dispose(bool disposing) { SerializerState? inParams; ConsumedCapability? target; bool justDisposed = false; lock (ReentrancyBlocker) { inParams = _inParams; _inParams = null; target = _target; _target = null; if (disposing) { if (!StateFlags.HasFlag(State.Disposed)) { StateFlags |= State.Disposed; justDisposed = true; AutoFinish(); } } else { StateFlags |= State.Finalized; } } ReleaseCaps(target, inParams); if (justDisposed) { _tcs.TrySetCanceled(); } } /// /// Finalizer /// ~PendingQuestion() { Dispose(false); } /// /// Implements . /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } #endregion } }