Fixed a race condition: Cancelling a question (at client side) which is about to be answered (from server side) might prevent releasing returned capabilities.

This commit is contained in:
Christian Köllner 2019-08-25 15:59:07 +02:00
parent 83ee624308
commit e6c129b8da
5 changed files with 36 additions and 24 deletions

View File

@ -265,13 +265,13 @@ namespace Capnp.Net.Runtime.Tests
} }
} }
[TestMethod, Timeout(120000)] [TestMethod]
public void ReleaseOnCancelClient() public void ReleaseOnCancelClient()
{ {
// Since we have a threaded model, there is no way to deterministically provoke the situation // Since we have a threaded model, there is no way to deterministically provoke the situation
// where Cancel and Finish message cross paths. Instead, we'll do a lot of such requests and // where Cancel and Finish message cross paths. Instead, we'll do a lot of such requests and
// later on verify that the handle count is 0. // later on verify that the handle count is 0.
int iterationCount = 1000; int iterationCount = 5000;
LaunchCompatTestProcess("server:MoreStuff", stdout => LaunchCompatTestProcess("server:MoreStuff", stdout =>
{ {
@ -305,9 +305,6 @@ namespace Capnp.Net.Runtime.Tests
--handleCount; --handleCount;
break; break;
case "getCallSequence":
break;
default: default:
Assert.Fail("Unexpected output"); Assert.Fail("Unexpected output");
break; break;
@ -323,35 +320,22 @@ namespace Capnp.Net.Runtime.Tests
for (int i = 0; i < iterationCount; i++) for (int i = 0; i < iterationCount; i++)
{ {
var task = main.GetHandle(default); var task = main.GetHandle(default);
Impatient.GetAnswer(task).Dispose();
taskList.Add(task.ContinueWith(t => taskList.Add(task.ContinueWith(t =>
{ {
try try
{ {
t.Result.Dispose(); t.Result.Dispose();
// Scenario 1: Cancellation happened after computing the answer, but before client-side completion.
} }
catch (TaskCanceledException) catch (AggregateException ex) when (ex.InnerException is TaskCanceledException)
{ {
// Scenario 2: Cancellation happened before or while computing the answer.
}
catch (AggregateException ex) when (ex.InnerException is ObjectDisposedException)
{
// Scenario 3: Cancellation happened after computing the answer, and after client-side completion.
} }
})); }));
Impatient.GetAnswer(task).Dispose();
} }
// Ensure that all answers return (probably in canceled state) // Ensure that all answers return (probably in canceled state)
Assert.IsTrue(Task.WhenAll(taskList).Wait(LargeNonDbgTimeout)); Assert.IsTrue(Task.WhenAll(taskList).Wait(LargeNonDbgTimeout));
// Not part of original test. "Terminate" sequence with
// call to some different operation: getCallSequence
// Motivation: Cancelling the GetHandle request immediately after sending it might seduce the
// remote RPC implementation to immediately discard the request, before even calling the server
// method. => Cannot rely on receiving *exactly* 2*iterationCount outputs.
var term = main.GetCallSequence(0, default);
Assert.IsTrue(verifyOutputTask.Wait(LargeNonDbgTimeout)); Assert.IsTrue(verifyOutputTask.Wait(LargeNonDbgTimeout));
// Not part of original test. Ensure that there is no unwanted extra output // Not part of original test. Ensure that there is no unwanted extra output

View File

@ -139,8 +139,14 @@ namespace Capnp.Net.Runtime.Tests
var task = main.GetHandle(cts.Token); var task = main.GetHandle(cts.Token);
cts.Cancel(); cts.Cancel();
task.ContinueWith(t => task.ContinueWith(t =>
{
try
{ {
t.Result.Dispose(); t.Result.Dispose();
}
catch (AggregateException ex) when (ex.InnerException is TaskCanceledException)
{
}
cts.Dispose(); cts.Dispose();
}); });
} }

View File

@ -136,7 +136,10 @@ namespace Capnp.Rpc
} }
else else
{ {
_tcs.TrySetResult(results); if (!_tcs.TrySetResult(results))
{
ReleaseOutCaps(results);
}
} }
} }
@ -272,6 +275,14 @@ namespace Capnp.Rpc
} }
} }
static void ReleaseOutCaps(DeserializerState outParams)
{
foreach (var cap in outParams.Caps)
{
cap?.Release();
}
}
internal void Send() internal void Send()
{ {
SerializerState inParams; SerializerState inParams;
@ -314,6 +325,7 @@ namespace Capnp.Rpc
{ {
SerializerState inParams; SerializerState inParams;
ConsumedCapability target; ConsumedCapability target;
bool justDisposed = false;
lock (ReentrancyBlocker) lock (ReentrancyBlocker)
{ {
@ -327,6 +339,7 @@ namespace Capnp.Rpc
if (!StateFlags.HasFlag(State.Disposed)) if (!StateFlags.HasFlag(State.Disposed))
{ {
StateFlags |= State.Disposed; StateFlags |= State.Disposed;
justDisposed = true;
AutoFinish(); AutoFinish();
} }
@ -338,6 +351,11 @@ namespace Capnp.Rpc
} }
ReleaseCaps(target, inParams); ReleaseCaps(target, inParams);
if (justDisposed)
{
_tcs.TrySetCanceled();
}
} }
/// <summary> /// <summary>

View File

@ -74,7 +74,7 @@ namespace Capnp.Rpc
internal sealed override void Release() internal sealed override void Release()
{ {
if (1 == Interlocked.Decrement(ref _refCount)) if (1 >= Interlocked.Decrement(ref _refCount))
{ {
Dispose(true); Dispose(true);
GC.SuppressFinalize(this); GC.SuppressFinalize(this);

View File

@ -52,8 +52,12 @@ namespace Capnp.Rpc
protected override void Dispose(bool disposing) protected override void Dispose(bool disposing)
{ {
base.Dispose(disposing); base.Dispose(disposing);
lock (_question.ReentrancyBlocker)
{
_resolvedCap?.Dispose(); _resolvedCap?.Dispose();
} }
}
protected override Proxy ResolvedCap protected override Proxy ResolvedCap
{ {