fixed race condition in RemoteResolvingCapability

fixed test framework error
This commit is contained in:
Christian Köllner 2020-04-19 18:53:09 +02:00
parent c3cbb123c8
commit d833dbe591
8 changed files with 124 additions and 32 deletions

View File

@ -400,7 +400,7 @@ namespace Capnp.Net.Runtime.Tests
{ {
var policy = new MyPolicy("a"); var policy = new MyPolicy("a");
(var server, var client) = SetupClientServerPair(true); (var server, var client) = SetupClientServerPair(TcpRpcTestOptions.ClientTracer);
using (server) using (server)
using (client) using (client)

View File

@ -26,7 +26,7 @@ namespace Capnp.Net.Runtime.Tests
[TestMethod] [TestMethod]
public void Pipeline() public void Pipeline()
{ {
NewLocalhostTcpTestbed().RunTest(Testsuite.Pipeline); NewLocalhostTcpTestbed(TcpRpcTestOptions.ClientTracer).RunTest(Testsuite.Pipeline);
} }
[TestMethod] [TestMethod]

View File

@ -53,6 +53,7 @@ namespace Capnp.Net.Runtime.Tests
public void Cancel() public void Cancel()
{ {
var t = new TcpRpcPorted(); var t = new TcpRpcPorted();
t.InitConsoleLogging();
Repeat(1000, t.Cancel); Repeat(1000, t.Cancel);
} }
@ -60,13 +61,18 @@ namespace Capnp.Net.Runtime.Tests
public void Embargo() public void Embargo()
{ {
var t = new TcpRpcPorted(); var t = new TcpRpcPorted();
Repeat(100, t.Embargo); t.InitConsoleLogging();
Repeat(100,
() =>
NewLocalhostTcpTestbed(TcpRpcTestOptions.ClientTracer | TcpRpcTestOptions.ClientFluctStream)
.RunTest(Testsuite.EmbargoOnPromisedAnswer));
} }
[TestMethod] [TestMethod]
public void EmbargoServer() public void EmbargoServer()
{ {
var t2 = new TcpRpcInterop(); var t2 = new TcpRpcInterop();
t2.InitConsoleLogging();
Repeat(20, t2.EmbargoServer); Repeat(20, t2.EmbargoServer);
} }
@ -76,9 +82,11 @@ namespace Capnp.Net.Runtime.Tests
// Some code paths are really rare during this test, therefore increased repetition count. // Some code paths are really rare during this test, therefore increased repetition count.
var t = new TcpRpcPorted(); var t = new TcpRpcPorted();
t.InitConsoleLogging();
Repeat(1000, t.EmbargoNull); Repeat(1000, t.EmbargoNull);
var t2 = new TcpRpcInterop(); var t2 = new TcpRpcInterop();
t2.InitConsoleLogging();
Repeat(100, t2.EmbargoNullServer); Repeat(100, t2.EmbargoNullServer);
} }
@ -86,6 +94,7 @@ namespace Capnp.Net.Runtime.Tests
public void RetainAndRelease() public void RetainAndRelease()
{ {
var t = new TcpRpcPorted(); var t = new TcpRpcPorted();
t.InitConsoleLogging();
Repeat(100, t.RetainAndRelease); Repeat(100, t.RetainAndRelease);
} }
@ -93,6 +102,7 @@ namespace Capnp.Net.Runtime.Tests
public void PipelineAfterReturn() public void PipelineAfterReturn()
{ {
var t = new TcpRpc(); var t = new TcpRpc();
t.InitConsoleLogging();
Repeat(100, t.PipelineAfterReturn); Repeat(100, t.PipelineAfterReturn);
} }

View File

@ -0,0 +1,50 @@
using System;
using System.IO;
using System.Threading;
namespace Capnp.Net.Runtime.Tests
{
class FluctStream : Stream
{
readonly Stream _baseStream;
readonly Random _rng = new Random();
public FluctStream(Stream baseStream)
{
_baseStream = baseStream;
}
public override bool CanRead => _baseStream.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => _baseStream.CanWrite;
public override long Length => _baseStream.Length;
public override long Position
{
get => _baseStream.Position;
set => throw new NotImplementedException();
}
public override void Flush() => _baseStream.Flush();
public override int Read(byte[] buffer, int offset, int count)
{
int n = _rng.Next(0, 8);
if (n >= 7)
Thread.Sleep(n - 7);
return _baseStream.Read(buffer, offset, count);
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
public override void SetLength(long value) => throw new NotImplementedException();
public override void Write(byte[] buffer, int offset, int count)
{
_baseStream.Write(buffer, offset, count);
}
}
}

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Capnp.Net.Runtime.Tests namespace Capnp.Net.Runtime.Tests

View File

@ -263,13 +263,19 @@ namespace Capnp.Net.Runtime.Tests
protected class LocalhostTcpTestbed : ITestbed, ITestController protected class LocalhostTcpTestbed : ITestbed, ITestController
{ {
readonly TcpRpcTestOptions _options;
TcpRpcServer _server; TcpRpcServer _server;
TcpRpcClient _client; TcpRpcClient _client;
bool _prematurelyClosed; bool _prematurelyClosed;
public LocalhostTcpTestbed(TcpRpcTestOptions options)
{
_options = options;
}
public void RunTest(Action<ITestbed> action) public void RunTest(Action<ITestbed> action)
{ {
(_server, _client) = SetupClientServerPair(); (_server, _client) = SetupClientServerPair(_options);
Assert.IsTrue(SpinWait.SpinUntil(() => _server.ConnectionCount > 0, MediumNonDbgTimeout)); Assert.IsTrue(SpinWait.SpinUntil(() => _server.ConnectionCount > 0, MediumNonDbgTimeout));
var conn = _server.Connections[0]; var conn = _server.Connections[0];
@ -345,16 +351,26 @@ namespace Capnp.Net.Runtime.Tests
protected ILogger Logger { get; set; } protected ILogger Logger { get; set; }
protected static TcpRpcClient SetupClient(bool withTracer = false) protected static TcpRpcClient SetupClient(TcpRpcTestOptions options = TcpRpcTestOptions.None)
{ {
var client = new TcpRpcClient(); var client = new TcpRpcClient();
client.AddBuffering(); client.AddBuffering();
if (withTracer) if (options.HasFlag(TcpRpcTestOptions.ClientTracer))
client.AttachTracer(new FrameTracing.RpcFrameTracer(Console.Out, false)); client.AttachTracer(new FrameTracing.RpcFrameTracer(Console.Out, false));
if (options.HasFlag(TcpRpcTestOptions.ClientFluctStream))
client.InjectMidlayer(s => new FluctStream(s));
client.Connect("localhost", TcpPort); client.Connect("localhost", TcpPort);
return client; return client;
} }
[Flags]
public enum TcpRpcTestOptions
{
None = 0,
ClientTracer = 1,
ClientFluctStream = 2
}
protected static TcpRpcServer SetupServer() protected static TcpRpcServer SetupServer()
{ {
int attempt = 0; int attempt = 0;
@ -381,10 +397,10 @@ namespace Capnp.Net.Runtime.Tests
} }
} }
protected static (TcpRpcServer, TcpRpcClient) SetupClientServerPair(bool withClientTracer = false) protected static (TcpRpcServer, TcpRpcClient) SetupClientServerPair(TcpRpcTestOptions options = TcpRpcTestOptions.None)
{ {
var server = SetupServer(); var server = SetupServer();
var client = SetupClient(withClientTracer); var client = SetupClient(options);
return (server, client); return (server, client);
} }
@ -404,7 +420,8 @@ namespace Capnp.Net.Runtime.Tests
} }
protected static DtbdctTestbed NewDtbdctTestbed() => new DtbdctTestbed(); protected static DtbdctTestbed NewDtbdctTestbed() => new DtbdctTestbed();
protected static LocalhostTcpTestbed NewLocalhostTcpTestbed() => new LocalhostTcpTestbed(); protected static LocalhostTcpTestbed NewLocalhostTcpTestbed(TcpRpcTestOptions options = TcpRpcTestOptions.None) =>
new LocalhostTcpTestbed(options);
protected static LocalTestbed NewLocalTestbed() => new LocalTestbed(); protected static LocalTestbed NewLocalTestbed() => new LocalTestbed();
@ -412,9 +429,9 @@ namespace Capnp.Net.Runtime.Tests
public void InitConsoleLogging() public void InitConsoleLogging()
{ {
Logging.LoggerFactory?.Dispose(); Logging.LoggerFactory?.Dispose();
#pragma warning disable CS0618 // Typ oder Element ist veraltet #pragma warning disable CS0618
Logging.LoggerFactory = new LoggerFactory().AddConsole((msg, level) => true); Logging.LoggerFactory = new LoggerFactory().AddConsole((msg, level) => true);
#pragma warning restore CS0618 // Typ oder Element ist veraltet #pragma warning restore CS0618
Logger = Logging.CreateLogger<TcpRpcStress>(); Logger = Logging.CreateLogger<TcpRpcStress>();
if (Thread.CurrentThread.Name == null) if (Thread.CurrentThread.Name == null)
Thread.CurrentThread.Name = $"Test Thread {Thread.CurrentThread.ManagedThreadId}"; Thread.CurrentThread.Name = $"Test Thread {Thread.CurrentThread.ManagedThreadId}";

View File

@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging; using Capnp.Util;
using Microsoft.Extensions.Logging;
using System; using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -24,7 +25,7 @@ namespace Capnp.Rpc
} }
protected int _pendingCallsOnPromise; protected int _pendingCallsOnPromise;
Task? _disembargo; StrictlyOrderedAwaitTask? _disembargo;
protected abstract ConsumedCapability? ResolvedCap { get; } protected abstract ConsumedCapability? ResolvedCap { get; }
@ -64,7 +65,7 @@ namespace Capnp.Rpc
#if DebugEmbargos #if DebugEmbargos
Logger.LogDebug("Requesting disembargo"); Logger.LogDebug("Requesting disembargo");
#endif #endif
_disembargo = _ep.RequestSenderLoopback(GetMessageTarget); _disembargo = _ep.RequestSenderLoopback(GetMessageTarget).EnforceAwaitOrder();
} }
else else
{ {
@ -75,8 +76,10 @@ namespace Capnp.Rpc
var cancellationTokenSource = new CancellationTokenSource(); var cancellationTokenSource = new CancellationTokenSource();
var callAfterDisembargo = _disembargo.ContinueWith(_ => async Task<DeserializerState> AwaitAnswer()
{ {
await _disembargo!;
// Two reasons for ignoring exceptions on the previous task (i.e. not _.Wait()ing): // Two reasons for ignoring exceptions on the previous task (i.e. not _.Wait()ing):
// 1. A faulting predecessor, especially due to cancellation, must not have any impact on this one. // 1. A faulting predecessor, especially due to cancellation, must not have any impact on this one.
// 2. A faulting disembargo request would imply that the other side cannot send pending requests anyway. // 2. A faulting disembargo request would imply that the other side cannot send pending requests anyway.
@ -88,15 +91,7 @@ namespace Capnp.Rpc
} }
using var proxy = new Proxy(resolvedCap); using var proxy = new Proxy(resolvedCap);
return proxy.Call(interfaceId, methodId, args, default); var promisedAnswer = proxy.Call(interfaceId, methodId, args, default);
}, TaskContinuationOptions.ExecuteSynchronously);
_disembargo = callAfterDisembargo;
async Task<DeserializerState> AwaitAnswer()
{
var promisedAnswer = await callAfterDisembargo;
using (cancellationTokenSource.Token.Register(promisedAnswer.Dispose)) using (cancellationTokenSource.Token.Register(promisedAnswer.Dispose))
{ {

View File

@ -7,7 +7,7 @@ using System.Threading.Tasks;
namespace Capnp.Util namespace Capnp.Util
{ {
internal class StrictlyOrderedAwaitTask<T>: INotifyCompletion internal class StrictlyOrderedAwaitTask: INotifyCompletion
{ {
class Cover { } class Cover { }
class Seal { } class Seal { }
@ -15,16 +15,16 @@ namespace Capnp.Util
static readonly Cover s_cover = new Cover(); static readonly Cover s_cover = new Cover();
static readonly Seal s_seal = new Seal(); static readonly Seal s_seal = new Seal();
readonly Task<T> _awaitedTask; readonly Task _awaitedTask;
object? _state; object? _state;
public StrictlyOrderedAwaitTask(Task<T> awaitedTask) public StrictlyOrderedAwaitTask(Task awaitedTask)
{ {
_awaitedTask = awaitedTask; _awaitedTask = awaitedTask;
_state = s_cover; _state = s_cover;
} }
public StrictlyOrderedAwaitTask<T> GetAwaiter() public StrictlyOrderedAwaitTask GetAwaiter()
{ {
return this; return this;
} }
@ -94,18 +94,37 @@ namespace Capnp.Util
public bool IsCompleted => _awaitedTask.IsCompleted && _state == s_seal; public bool IsCompleted => _awaitedTask.IsCompleted && _state == s_seal;
public T GetResult() => _awaitedTask.GetAwaiter().GetResult(); public void GetResult() => _awaitedTask.GetAwaiter().GetResult();
public T Result => _awaitedTask.Result; public Task WrappedTask => _awaitedTask;
public Task<T> WrappedTask => _awaitedTask;
} }
internal class StrictlyOrderedAwaitTask<T> : StrictlyOrderedAwaitTask
{
public StrictlyOrderedAwaitTask(Task<T> awaitedTask): base(awaitedTask)
{
}
public new Task<T> WrappedTask => (Task<T>)base.WrappedTask;
public new StrictlyOrderedAwaitTask<T> GetAwaiter() => this;
public new T GetResult() => WrappedTask.GetAwaiter().GetResult();
public T Result => WrappedTask.Result;
}
internal static class StrictlyOrderedTaskExtensions internal static class StrictlyOrderedTaskExtensions
{ {
public static StrictlyOrderedAwaitTask<T> EnforceAwaitOrder<T>(this Task<T> task) public static StrictlyOrderedAwaitTask<T> EnforceAwaitOrder<T>(this Task<T> task)
{ {
return new StrictlyOrderedAwaitTask<T>(task); return new StrictlyOrderedAwaitTask<T>(task);
} }
public static StrictlyOrderedAwaitTask EnforceAwaitOrder(this Task task)
{
return new StrictlyOrderedAwaitTask(task);
}
} }
} }