diff --git a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs index 0726013..20cd863 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs @@ -265,13 +265,13 @@ namespace Capnp.Net.Runtime.Tests } } - [TestMethod, Timeout(120000)] + [TestMethod] public void ReleaseOnCancelClient() { // Since we have a threaded model, there is no way to deterministically provoke the situation // where Cancel and Finish message cross paths. Instead, we'll do a lot of such requests and // later on verify that the handle count is 0. - int iterationCount = 1000; + int iterationCount = 5000; LaunchCompatTestProcess("server:MoreStuff", stdout => { @@ -305,9 +305,6 @@ namespace Capnp.Net.Runtime.Tests --handleCount; break; - case "getCallSequence": - break; - default: Assert.Fail("Unexpected output"); break; @@ -323,35 +320,22 @@ namespace Capnp.Net.Runtime.Tests for (int i = 0; i < iterationCount; i++) { var task = main.GetHandle(default); - Impatient.GetAnswer(task).Dispose(); taskList.Add(task.ContinueWith(t => { try { t.Result.Dispose(); - // Scenario 1: Cancellation happened after computing the answer, but before client-side completion. } - catch (TaskCanceledException) + catch (AggregateException ex) when (ex.InnerException is TaskCanceledException) { - // Scenario 2: Cancellation happened before or while computing the answer. - } - catch (AggregateException ex) when (ex.InnerException is ObjectDisposedException) - { - // Scenario 3: Cancellation happened after computing the answer, and after client-side completion. } })); + Impatient.GetAnswer(task).Dispose(); } // Ensure that all answers return (probably in canceled state) Assert.IsTrue(Task.WhenAll(taskList).Wait(LargeNonDbgTimeout)); - // Not part of original test. "Terminate" sequence with - // call to some different operation: getCallSequence - // Motivation: Cancelling the GetHandle request immediately after sending it might seduce the - // remote RPC implementation to immediately discard the request, before even calling the server - // method. => Cannot rely on receiving *exactly* 2*iterationCount outputs. - var term = main.GetCallSequence(0, default); - Assert.IsTrue(verifyOutputTask.Wait(LargeNonDbgTimeout)); // Not part of original test. Ensure that there is no unwanted extra output diff --git a/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs b/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs index 9efddc0..dec51ca 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs @@ -140,7 +140,13 @@ namespace Capnp.Net.Runtime.Tests cts.Cancel(); task.ContinueWith(t => { - t.Result.Dispose(); + try + { + t.Result.Dispose(); + } + catch (AggregateException ex) when (ex.InnerException is TaskCanceledException) + { + } cts.Dispose(); }); } diff --git a/Capnp.Net.Runtime/Rpc/PendingQuestion.cs b/Capnp.Net.Runtime/Rpc/PendingQuestion.cs index af32001..2d7addc 100644 --- a/Capnp.Net.Runtime/Rpc/PendingQuestion.cs +++ b/Capnp.Net.Runtime/Rpc/PendingQuestion.cs @@ -136,7 +136,10 @@ namespace Capnp.Rpc } else { - _tcs.TrySetResult(results); + if (!_tcs.TrySetResult(results)) + { + ReleaseOutCaps(results); + } } } @@ -272,6 +275,14 @@ namespace Capnp.Rpc } } + static void ReleaseOutCaps(DeserializerState outParams) + { + foreach (var cap in outParams.Caps) + { + cap?.Release(); + } + } + internal void Send() { SerializerState inParams; @@ -314,6 +325,7 @@ namespace Capnp.Rpc { SerializerState inParams; ConsumedCapability target; + bool justDisposed = false; lock (ReentrancyBlocker) { @@ -327,6 +339,7 @@ namespace Capnp.Rpc if (!StateFlags.HasFlag(State.Disposed)) { StateFlags |= State.Disposed; + justDisposed = true; AutoFinish(); } @@ -338,6 +351,11 @@ namespace Capnp.Rpc } ReleaseCaps(target, inParams); + + if (justDisposed) + { + _tcs.TrySetCanceled(); + } } /// diff --git a/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs b/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs index 7d61089..b620a8f 100644 --- a/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RefCountingCapability.cs @@ -74,7 +74,7 @@ namespace Capnp.Rpc internal sealed override void Release() { - if (1 == Interlocked.Decrement(ref _refCount)) + if (1 >= Interlocked.Decrement(ref _refCount)) { Dispose(true); GC.SuppressFinalize(this); diff --git a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs index 816a8a3..e192a15 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteAnswerCapability.cs @@ -52,7 +52,11 @@ namespace Capnp.Rpc protected override void Dispose(bool disposing) { base.Dispose(disposing); - _resolvedCap?.Dispose(); + + lock (_question.ReentrancyBlocker) + { + _resolvedCap?.Dispose(); + } } protected override Proxy ResolvedCap