fine-tuning cap lifecycle mgmt.

added missing xml doc.
This commit is contained in:
Christian Köllner 2020-03-22 16:08:28 +01:00
parent eb321e5a8e
commit 9dbf474334
16 changed files with 114 additions and 71 deletions

View File

@ -29,7 +29,7 @@
</PropertyGroup>
<PropertyGroup>
<DefineConstants>DebugFinalizers</DefineConstants>
<DefineConstants></DefineConstants>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Release|netstandard2.0|AnyCPU'">

View File

@ -636,9 +636,6 @@ namespace Capnp
/// </summary>
/// <typeparam name="T">Capability interface</typeparam>
/// <param name="index">index within this struct's pointer table</param>
/// <param name="memberName">debugging aid</param>
/// <param name="sourceFilePath">debugging aid</param>
/// <param name="sourceLineNumber">debugging aid</param>
/// <returns>capability instance or null if pointer was null</returns>
/// <exception cref="IndexOutOfRangeException">negative index</exception>
/// <exception cref="DeserializationException">state does not represent a struct, invalid pointer,
@ -685,15 +682,19 @@ namespace Capnp
return (Rpc.CapabilityReflection.CreateProxy<T>(Caps[(int)CapabilityIndex]) as T)!;
}
/// <summary>
/// Releases the capability table
/// </summary>
public void Dispose()
{
if (Caps != null && !_disposed)
{
foreach (var cap in Caps)
{
cap?.Release(false);
cap?.Release();
}
Caps = null;
_disposed = true;
}
}

View File

@ -251,9 +251,6 @@ namespace Capnp.Rpc
/// </summary>
/// <typeparam name="TInterface">Capability interface. Must be annotated with <see cref="ProxyAttribute"/>.</typeparam>
/// <param name="cap">low-level capability</param>
/// <param name="memberName">debugging aid</param>
/// <param name="sourceFilePath">debugging aid</param>
/// <param name="sourceLineNumber">debugging aid</param>
/// <returns>The Proxy instance which implements <typeparamref name="TInterface"/>.</returns>
/// <exception cref="ArgumentNullException"><paramref name="cap"/> is null.</exception>
/// <exception cref="InvalidCapabilityInterfaceException"><typeparamref name="TInterface"/> did not qualify as capability interface.</exception>

View File

@ -20,11 +20,7 @@ namespace Capnp.Rpc
internal abstract void Unfreeze();
internal abstract void AddRef();
internal abstract void Release(
bool keepAlive,
[System.Runtime.CompilerServices.CallerMemberName] string methodName = "",
[System.Runtime.CompilerServices.CallerFilePath] string filePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0);
internal abstract void Release();
#if DebugFinalizers
internal Proxy? OwningProxy { get; set; }

View File

@ -24,6 +24,12 @@ namespace Capnp.Rpc
/// <returns>Pipelined low-level capability</returns>
ConsumedCapability? Access(MemberAccessPath access);
/// <summary>
///
/// </summary>
/// <param name="access">Creates a low-level capability for promise pipelining.</param>
/// <param name="proxyTask">Task returning the proxy whose ownership will be taken over</param>
/// <returns></returns>
ConsumedCapability? Access(MemberAccessPath access, Task<IDisposable?> proxyTask);
}
}

View File

@ -93,6 +93,13 @@ namespace Capnp.Rpc
return answer;
}
/// <summary>
/// Returns a promise-pipelined capability for a remote method invocation Task.
/// </summary>
/// <param name="task">remote method invocation task</param>
/// <param name="access">path to the desired capability</param>
/// <param name="proxyTask">task returning a proxy to the desired capability</param>
/// <returns>Pipelined low-level capability</returns>
public static ConsumedCapability? Access(Task task, MemberAccessPath access, Task<IDisposable?> proxyTask)
{
var answer = TryGetAnswer(task);
@ -106,9 +113,6 @@ namespace Capnp.Rpc
/// </summary>
/// <typeparam name="TInterface">Capability interface type</typeparam>
/// <param name="task">The task</param>
/// <param name="memberName">debugging aid</param>
/// <param name="sourceFilePath">debugging aid</param>
/// <param name="sourceLineNumber">debugging aid</param>
/// <returns>A proxy for the given task.</returns>
/// <exception cref="ArgumentNullException"><paramref name="task"/> is null.</exception>
/// <exception cref="InvalidCapabilityInterfaceException"><typeparamref name="TInterface"/> did not

View File

@ -18,7 +18,7 @@ namespace Capnp.Rpc.Interception
protected override void ReleaseRemotely()
{
InterceptedCapability.Release(false);
InterceptedCapability.Release();
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)

View File

@ -12,7 +12,7 @@ namespace Capnp.Rpc
var result = await answer;
var cap = access.Eval(result);
var proxy = new Proxy(cap);
cap?.Release(false);
cap?.Release();
return proxy;
}

View File

@ -22,7 +22,6 @@ namespace Capnp.Rpc
}
public Skeleton ProvidedCap { get; }
int _releaseFlag;
LocalCapability(Skeleton providedCap)
{
@ -31,20 +30,12 @@ namespace Capnp.Rpc
internal override void AddRef()
{
if (0 == Interlocked.CompareExchange(ref _releaseFlag, 0, 1))
ProvidedCap.Claim();
ProvidedCap.Claim();
}
internal override void Release(
bool keepAlive,
[System.Runtime.CompilerServices.CallerMemberName] string methodName = "",
[System.Runtime.CompilerServices.CallerFilePath] string filePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0)
internal override void Release()
{
if (keepAlive)
Interlocked.Exchange(ref _releaseFlag, 1);
else
ProvidedCap.Relinquish();
ProvidedCap.Relinquish();
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)

View File

@ -155,7 +155,7 @@ namespace Capnp.Rpc
{
foreach (var cap in aorcq.Answer.Caps)
{
cap?.Release(false);
cap?.Release();
}
}
}

View File

@ -277,13 +277,13 @@ namespace Capnp.Rpc
{
foreach (var cap in inParams.Caps!)
{
cap?.Release(false);
cap?.Release();
}
}
if (target != null)
{
target.Release(false);
target.Release();
}
}
@ -291,7 +291,7 @@ namespace Capnp.Rpc
{
foreach (var cap in outParams.Caps!)
{
cap?.Release(false);
cap?.Release();
}
}

View File

@ -11,6 +11,12 @@ namespace Capnp.Rpc
/// </summary>
public class Proxy : IDisposable, IResolvingCapability
{
/// <summary>
/// Creates a new proxy object for an existing implementation or proxy, sharing its ownership.
/// </summary>
/// <typeparam name="T">Capability interface</typeparam>
/// <param name="obj">instance to share</param>
/// <returns></returns>
public static T Share<T>(T obj) where T: class
{
if (obj is Proxy proxy)
@ -149,14 +155,14 @@ namespace Capnp.Rpc
{
if (disposing)
{
ConsumedCap?.Release(false);
ConsumedCap?.Release();
}
else
{
// When called from the Finalizer, we must not throw.
// But when reference counting goes wrong, ConsumedCapability.Release() will throw an InvalidOperationException.
// The only option here is to suppress that exception.
try { ConsumedCap?.Release(false); }
try { ConsumedCap?.Release(); }
catch { }
}

View File

@ -26,17 +26,8 @@ namespace Capnp.Rpc
// Value 0 has the special meaning of being in state C.
long _refCount = 1;
#if DebugCapabilityLifecycle || DebugFinalizers
ILogger Logger { get; } = Logging.CreateLogger<RefCountingCapability>();
#endif
#if DebugCapabilityLifecycle
string? _releasingMethodName;
string? _releasingFilePath;
int _releasingLineNumber;
#endif
#if DebugFinalizers
ILogger Logger { get; } = Logging.CreateLogger<RefCountingCapability>();
string CreatorStackTrace { get; set; }
#endif
@ -83,11 +74,7 @@ namespace Capnp.Rpc
{
lock (_reentrancyBlocker)
{
if (_refCount == int.MinValue)
{
_refCount = 2;
}
else if (++_refCount <= 1)
if (++_refCount <= 1)
{
--_refCount;
@ -101,30 +88,16 @@ namespace Capnp.Rpc
}
}
internal sealed override void Release(
bool keepAlive,
[System.Runtime.CompilerServices.CallerMemberName] string methodName = "",
[System.Runtime.CompilerServices.CallerFilePath] string filePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0)
internal sealed override void Release()
{
lock (_reentrancyBlocker)
{
switch (_refCount)
{
case 2 when keepAlive:
_refCount = int.MinValue;
break;
case 1: // initial state, actually ref. count 0
case 2: // actually ref. count 1
_refCount = 0;
#if DebugCapabilityLifecycle
_releasingMethodName = methodName;
_releasingFilePath = filePath;
_releasingLineNumber = lineNumber;
#endif
Dispose(true);
GC.SuppressFinalize(this);
break;

View File

@ -42,7 +42,7 @@ namespace Capnp.Rpc
var result = await question.WhenReturned;
var cap = access.Eval(result);
var proxy = new Proxy(cap);
cap?.Release(false);
cap?.Release();
return proxy;
}

View File

@ -60,11 +60,25 @@ namespace Capnp.Rpc
}
}
/// <summary>
/// Stateful implementation for hosting a two-party RPC session. <see cref="RpcEngine"/> may own multiple mutually
/// independent endpoints.
/// </summary>
public class RpcEndpoint : IEndpoint, IRpcEndpoint
{
/// <summary>
/// Endpoint state
/// </summary>
public enum EndpointState
{
/// <summary>
/// Active means ready for exchanging RPC messages.
/// </summary>
Active,
/// <summary>
/// The session is closed, either deliberately or due to an error condition.
/// </summary>
Dismissed
}
@ -95,8 +109,14 @@ namespace Capnp.Rpc
State = EndpointState.Active;
}
/// <summary>
/// Session state
/// </summary>
public EndpointState State { get; private set; }
/// <summary>
/// Closes the session, clears export table, terminates all pending questions and enters 'Dismissed' state.
/// </summary>
public void Dismiss()
{
lock (_reentrancyBlocker)
@ -120,6 +140,10 @@ namespace Capnp.Rpc
_tx.Dismiss();
}
/// <summary>
/// Feeds a frame for processing
/// </summary>
/// <param name="frame">frame to process</param>
public void Forward(WireFrame frame)
{
if (State == EndpointState.Dismissed)
@ -129,11 +153,43 @@ namespace Capnp.Rpc
ProcessFrame(frame);
}
/// <summary>
/// Number of frames sent so far
/// </summary>
public long SendCount => Interlocked.Read(ref _sendCount);
/// <summary>
/// Number of frames received so far
/// </summary>
public long RecvCount => Interlocked.Read(ref _recvCount);
public int ImportedCapabilityCount => _importTable.Count;
public int ExportedCapabilityCount => _exportTable.Count;
/// <summary>
/// Current number of entries in import table
/// </summary>
public int ImportedCapabilityCount
{
get
{
lock (_reentrancyBlocker)
{
return _importTable.Count;
}
}
}
/// <summary>
/// Current number of entries in export table
/// </summary>
public int ExportedCapabilityCount
{
get
{
lock (_reentrancyBlocker)
{
return _exportTable.Count;
}
}
}
void Tx(WireFrame frame)
{
@ -1095,6 +1151,10 @@ namespace Capnp.Rpc
}
}
/// <summary>
/// Queries the peer for its bootstrap capability
/// </summary>
/// <returns>low-level capability</returns>
public ConsumedCapability QueryMain()
{
var mb = MessageBuilder.Create();
@ -1370,7 +1430,7 @@ namespace Capnp.Rpc
else
{
postAction += cap.Export(this, capDesc);
cap.Release(false);
cap.Release();
}
}
@ -1498,6 +1558,11 @@ namespace Capnp.Rpc
readonly ConcurrentBag<RpcEndpoint> _inboundEndpoints = new ConcurrentBag<RpcEndpoint>();
/// <summary>
/// Adds an endpoint
/// </summary>
/// <param name="outboundEndpoint">endpoint for handling outgoing messages</param>
/// <returns>endpoint for handling incoming messages</returns>
public RpcEndpoint AddEndpoint(IEndpoint outboundEndpoint)
{
var inboundEndpoint = new RpcEndpoint(this, outboundEndpoint);

View File

@ -1382,15 +1382,19 @@ namespace Capnp
return new Rpc.BareProxy(cap);
}
/// <summary>
/// Releases the capability table
/// </summary>
public void Dispose()
{
if (Caps != null && !_disposed)
{
foreach (var cap in Caps)
{
cap?.Release(false);
cap?.Release();
}
Caps.Clear();
_disposed = true;
}
}