From d833dbe5919ec4c039fdf9fe039e9716c9bbd092 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sun, 19 Apr 2020 18:53:09 +0200 Subject: [PATCH] fixed race condition in RemoteResolvingCapability fixed test framework error --- Capnp.Net.Runtime.Tests/Interception.cs | 2 +- Capnp.Net.Runtime.Tests/TcpRpcPorted.cs | 2 +- Capnp.Net.Runtime.Tests/TcpRpcStress.cs | 12 ++++- Capnp.Net.Runtime.Tests/Util/FluctStream.cs | 50 +++++++++++++++++++ .../Util/ScatteringStream.cs | 1 + Capnp.Net.Runtime.Tests/Util/TestBase.cs | 33 +++++++++--- .../Rpc/RemoteResolvingCapability.cs | 21 +++----- .../Util/StrictlyOrderedAwaitTask.cs | 35 ++++++++++--- 8 files changed, 124 insertions(+), 32 deletions(-) create mode 100644 Capnp.Net.Runtime.Tests/Util/FluctStream.cs diff --git a/Capnp.Net.Runtime.Tests/Interception.cs b/Capnp.Net.Runtime.Tests/Interception.cs index 76421da..ab6b9f4 100644 --- a/Capnp.Net.Runtime.Tests/Interception.cs +++ b/Capnp.Net.Runtime.Tests/Interception.cs @@ -400,7 +400,7 @@ namespace Capnp.Net.Runtime.Tests { var policy = new MyPolicy("a"); - (var server, var client) = SetupClientServerPair(true); + (var server, var client) = SetupClientServerPair(TcpRpcTestOptions.ClientTracer); using (server) using (client) diff --git a/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs b/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs index 20fc392..3e24333 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcPorted.cs @@ -26,7 +26,7 @@ namespace Capnp.Net.Runtime.Tests [TestMethod] public void Pipeline() { - NewLocalhostTcpTestbed().RunTest(Testsuite.Pipeline); + NewLocalhostTcpTestbed(TcpRpcTestOptions.ClientTracer).RunTest(Testsuite.Pipeline); } [TestMethod] diff --git a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs index 43a3cbd..b5de959 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcStress.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcStress.cs @@ -53,6 +53,7 @@ namespace Capnp.Net.Runtime.Tests public void Cancel() { var t = new TcpRpcPorted(); + t.InitConsoleLogging(); Repeat(1000, t.Cancel); } @@ -60,13 +61,18 @@ namespace Capnp.Net.Runtime.Tests public void Embargo() { var t = new TcpRpcPorted(); - Repeat(100, t.Embargo); + t.InitConsoleLogging(); + Repeat(100, + () => + NewLocalhostTcpTestbed(TcpRpcTestOptions.ClientTracer | TcpRpcTestOptions.ClientFluctStream) + .RunTest(Testsuite.EmbargoOnPromisedAnswer)); } [TestMethod] public void EmbargoServer() { var t2 = new TcpRpcInterop(); + t2.InitConsoleLogging(); Repeat(20, t2.EmbargoServer); } @@ -76,9 +82,11 @@ namespace Capnp.Net.Runtime.Tests // Some code paths are really rare during this test, therefore increased repetition count. var t = new TcpRpcPorted(); + t.InitConsoleLogging(); Repeat(1000, t.EmbargoNull); var t2 = new TcpRpcInterop(); + t2.InitConsoleLogging(); Repeat(100, t2.EmbargoNullServer); } @@ -86,6 +94,7 @@ namespace Capnp.Net.Runtime.Tests public void RetainAndRelease() { var t = new TcpRpcPorted(); + t.InitConsoleLogging(); Repeat(100, t.RetainAndRelease); } @@ -93,6 +102,7 @@ namespace Capnp.Net.Runtime.Tests public void PipelineAfterReturn() { var t = new TcpRpc(); + t.InitConsoleLogging(); Repeat(100, t.PipelineAfterReturn); } diff --git a/Capnp.Net.Runtime.Tests/Util/FluctStream.cs b/Capnp.Net.Runtime.Tests/Util/FluctStream.cs new file mode 100644 index 0000000..e5a9406 --- /dev/null +++ b/Capnp.Net.Runtime.Tests/Util/FluctStream.cs @@ -0,0 +1,50 @@ +using System; +using System.IO; +using System.Threading; + +namespace Capnp.Net.Runtime.Tests +{ + class FluctStream : Stream + { + readonly Stream _baseStream; + readonly Random _rng = new Random(); + + public FluctStream(Stream baseStream) + { + _baseStream = baseStream; + } + + public override bool CanRead => _baseStream.CanRead; + + public override bool CanSeek => false; + + public override bool CanWrite => _baseStream.CanWrite; + + public override long Length => _baseStream.Length; + + public override long Position + { + get => _baseStream.Position; + set => throw new NotImplementedException(); + } + + public override void Flush() => _baseStream.Flush(); + + public override int Read(byte[] buffer, int offset, int count) + { + int n = _rng.Next(0, 8); + if (n >= 7) + Thread.Sleep(n - 7); + return _baseStream.Read(buffer, offset, count); + } + + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + + public override void SetLength(long value) => throw new NotImplementedException(); + + public override void Write(byte[] buffer, int offset, int count) + { + _baseStream.Write(buffer, offset, count); + } + } +} diff --git a/Capnp.Net.Runtime.Tests/Util/ScatteringStream.cs b/Capnp.Net.Runtime.Tests/Util/ScatteringStream.cs index e6b44df..6335167 100644 --- a/Capnp.Net.Runtime.Tests/Util/ScatteringStream.cs +++ b/Capnp.Net.Runtime.Tests/Util/ScatteringStream.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace Capnp.Net.Runtime.Tests diff --git a/Capnp.Net.Runtime.Tests/Util/TestBase.cs b/Capnp.Net.Runtime.Tests/Util/TestBase.cs index 2b08763..41279b2 100644 --- a/Capnp.Net.Runtime.Tests/Util/TestBase.cs +++ b/Capnp.Net.Runtime.Tests/Util/TestBase.cs @@ -263,13 +263,19 @@ namespace Capnp.Net.Runtime.Tests protected class LocalhostTcpTestbed : ITestbed, ITestController { + readonly TcpRpcTestOptions _options; TcpRpcServer _server; TcpRpcClient _client; bool _prematurelyClosed; + public LocalhostTcpTestbed(TcpRpcTestOptions options) + { + _options = options; + } + public void RunTest(Action action) { - (_server, _client) = SetupClientServerPair(); + (_server, _client) = SetupClientServerPair(_options); Assert.IsTrue(SpinWait.SpinUntil(() => _server.ConnectionCount > 0, MediumNonDbgTimeout)); var conn = _server.Connections[0]; @@ -345,16 +351,26 @@ namespace Capnp.Net.Runtime.Tests protected ILogger Logger { get; set; } - protected static TcpRpcClient SetupClient(bool withTracer = false) + protected static TcpRpcClient SetupClient(TcpRpcTestOptions options = TcpRpcTestOptions.None) { var client = new TcpRpcClient(); client.AddBuffering(); - if (withTracer) + if (options.HasFlag(TcpRpcTestOptions.ClientTracer)) client.AttachTracer(new FrameTracing.RpcFrameTracer(Console.Out, false)); + if (options.HasFlag(TcpRpcTestOptions.ClientFluctStream)) + client.InjectMidlayer(s => new FluctStream(s)); client.Connect("localhost", TcpPort); return client; } + [Flags] + public enum TcpRpcTestOptions + { + None = 0, + ClientTracer = 1, + ClientFluctStream = 2 + } + protected static TcpRpcServer SetupServer() { int attempt = 0; @@ -381,10 +397,10 @@ namespace Capnp.Net.Runtime.Tests } } - protected static (TcpRpcServer, TcpRpcClient) SetupClientServerPair(bool withClientTracer = false) + protected static (TcpRpcServer, TcpRpcClient) SetupClientServerPair(TcpRpcTestOptions options = TcpRpcTestOptions.None) { var server = SetupServer(); - var client = SetupClient(withClientTracer); + var client = SetupClient(options); return (server, client); } @@ -404,7 +420,8 @@ namespace Capnp.Net.Runtime.Tests } protected static DtbdctTestbed NewDtbdctTestbed() => new DtbdctTestbed(); - protected static LocalhostTcpTestbed NewLocalhostTcpTestbed() => new LocalhostTcpTestbed(); + protected static LocalhostTcpTestbed NewLocalhostTcpTestbed(TcpRpcTestOptions options = TcpRpcTestOptions.None) => + new LocalhostTcpTestbed(options); protected static LocalTestbed NewLocalTestbed() => new LocalTestbed(); @@ -412,9 +429,9 @@ namespace Capnp.Net.Runtime.Tests public void InitConsoleLogging() { Logging.LoggerFactory?.Dispose(); -#pragma warning disable CS0618 // Typ oder Element ist veraltet +#pragma warning disable CS0618 Logging.LoggerFactory = new LoggerFactory().AddConsole((msg, level) => true); -#pragma warning restore CS0618 // Typ oder Element ist veraltet +#pragma warning restore CS0618 Logger = Logging.CreateLogger(); if (Thread.CurrentThread.Name == null) Thread.CurrentThread.Name = $"Test Thread {Thread.CurrentThread.ManagedThreadId}"; diff --git a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs index 937b66b..0dc9ac7 100644 --- a/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs +++ b/Capnp.Net.Runtime/Rpc/RemoteResolvingCapability.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Logging; +using Capnp.Util; +using Microsoft.Extensions.Logging; using System; using System.Threading; using System.Threading.Tasks; @@ -24,7 +25,7 @@ namespace Capnp.Rpc } protected int _pendingCallsOnPromise; - Task? _disembargo; + StrictlyOrderedAwaitTask? _disembargo; protected abstract ConsumedCapability? ResolvedCap { get; } @@ -64,7 +65,7 @@ namespace Capnp.Rpc #if DebugEmbargos Logger.LogDebug("Requesting disembargo"); #endif - _disembargo = _ep.RequestSenderLoopback(GetMessageTarget); + _disembargo = _ep.RequestSenderLoopback(GetMessageTarget).EnforceAwaitOrder(); } else { @@ -75,8 +76,10 @@ namespace Capnp.Rpc var cancellationTokenSource = new CancellationTokenSource(); - var callAfterDisembargo = _disembargo.ContinueWith(_ => + async Task AwaitAnswer() { + await _disembargo!; + // Two reasons for ignoring exceptions on the previous task (i.e. not _.Wait()ing): // 1. A faulting predecessor, especially due to cancellation, must not have any impact on this one. // 2. A faulting disembargo request would imply that the other side cannot send pending requests anyway. @@ -88,15 +91,7 @@ namespace Capnp.Rpc } using var proxy = new Proxy(resolvedCap); - return proxy.Call(interfaceId, methodId, args, default); - - }, TaskContinuationOptions.ExecuteSynchronously); - - _disembargo = callAfterDisembargo; - - async Task AwaitAnswer() - { - var promisedAnswer = await callAfterDisembargo; + var promisedAnswer = proxy.Call(interfaceId, methodId, args, default); using (cancellationTokenSource.Token.Register(promisedAnswer.Dispose)) { diff --git a/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs b/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs index 1565aa4..a7c6602 100644 --- a/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs +++ b/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs @@ -7,7 +7,7 @@ using System.Threading.Tasks; namespace Capnp.Util { - internal class StrictlyOrderedAwaitTask: INotifyCompletion + internal class StrictlyOrderedAwaitTask: INotifyCompletion { class Cover { } class Seal { } @@ -15,16 +15,16 @@ namespace Capnp.Util static readonly Cover s_cover = new Cover(); static readonly Seal s_seal = new Seal(); - readonly Task _awaitedTask; + readonly Task _awaitedTask; object? _state; - public StrictlyOrderedAwaitTask(Task awaitedTask) + public StrictlyOrderedAwaitTask(Task awaitedTask) { _awaitedTask = awaitedTask; _state = s_cover; } - public StrictlyOrderedAwaitTask GetAwaiter() + public StrictlyOrderedAwaitTask GetAwaiter() { return this; } @@ -94,18 +94,37 @@ namespace Capnp.Util public bool IsCompleted => _awaitedTask.IsCompleted && _state == s_seal; - public T GetResult() => _awaitedTask.GetAwaiter().GetResult(); + public void GetResult() => _awaitedTask.GetAwaiter().GetResult(); - public T Result => _awaitedTask.Result; - - public Task WrappedTask => _awaitedTask; + public Task WrappedTask => _awaitedTask; } + internal class StrictlyOrderedAwaitTask : StrictlyOrderedAwaitTask + { + public StrictlyOrderedAwaitTask(Task awaitedTask): base(awaitedTask) + { + } + + public new Task WrappedTask => (Task)base.WrappedTask; + public new StrictlyOrderedAwaitTask GetAwaiter() => this; + + public new T GetResult() => WrappedTask.GetAwaiter().GetResult(); + + public T Result => WrappedTask.Result; + + } + + internal static class StrictlyOrderedTaskExtensions { public static StrictlyOrderedAwaitTask EnforceAwaitOrder(this Task task) { return new StrictlyOrderedAwaitTask(task); } + + public static StrictlyOrderedAwaitTask EnforceAwaitOrder(this Task task) + { + return new StrictlyOrderedAwaitTask(task); + } } }