From 8b88b5c5ddc13b18eddc36d4108135c75d93c93d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Wed, 15 Apr 2020 21:52:56 +0200 Subject: [PATCH] fixed deadlock + blocked thread detection --- Capnp.Net.Runtime.Tests/General.cs | 113 +++++++++++++++++- Capnp.Net.Runtime.Tests/LocalRpc.cs | 2 +- Capnp.Net.Runtime/Assembly.cs | 3 + Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 10 +- Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 13 +- .../Util/StrictlyOrderedAwaitTask.cs | 43 +++++-- Capnp.Net.Runtime/Util/ThreadExtensions.cs | 49 ++++++++ 7 files changed, 213 insertions(+), 20 deletions(-) create mode 100644 Capnp.Net.Runtime/Assembly.cs create mode 100644 Capnp.Net.Runtime/Util/ThreadExtensions.cs diff --git a/Capnp.Net.Runtime.Tests/General.cs b/Capnp.Net.Runtime.Tests/General.cs index 0b564f6..686033b 100644 --- a/Capnp.Net.Runtime.Tests/General.cs +++ b/Capnp.Net.Runtime.Tests/General.cs @@ -1,4 +1,5 @@ using Capnp.Rpc; +using Capnp.Util; using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Collections.Generic; @@ -11,7 +12,7 @@ using System.Threading.Tasks.Dataflow; namespace Capnp.Net.Runtime.Tests { [TestClass] - public class General + public class General: TestBase { [TestMethod] public void AwaitOrderTest() @@ -92,5 +93,115 @@ namespace Capnp.Net.Runtime.Tests Assert.IsTrue(t.IsCompleted); }; } + + [TestMethod] + public void SafeJoinCompletedThread() + { + var thread = new Thread(() => + { + }); + thread.Start(); + thread.SafeJoin(null, 1000); + } + + [TestMethod] + public void SafeJoinBusyThread() + { + var thread = new Thread(() => + { + try + { + while (true) ; + } + catch (ThreadInterruptedException) + { + Console.WriteLine("Interrupted"); + } + catch (ThreadAbortException) + { + Console.WriteLine("Aborted"); + } + }); + thread.Start(); + thread.SafeJoin(null, 1000); + } + + [TestMethod] + public void SafeJoinSleepingThread() + { + var thread = new Thread(() => + { + try + { + Thread.Sleep(Timeout.Infinite); + } + catch (ThreadInterruptedException) + { + Console.WriteLine("Interrupted"); + } + catch (ThreadAbortException) + { + Console.WriteLine("Aborted"); + } + }); + + thread.Start(); + thread.SafeJoin(null, 1000); + } + + [TestMethod] + public void SafeJoinDeadlockedThread() + { + var lk = new object(); + + lock (lk) + { + var thread = new Thread(() => + { + try + { + lock (lk) + { + } + } + catch (ThreadInterruptedException) + { + Console.WriteLine("Interrupted"); + } + catch (ThreadAbortException) + { + Console.WriteLine("Aborted"); + } + }); + + thread.Start(); + thread.SafeJoin(null, 1000); + } + } + + [TestMethod] + public void SafeJoinDefensiveThread() + { + var thread = new Thread(() => + { + for (; ; ) + { + try + { + Thread.Sleep(Timeout.Infinite); + } + catch (ThreadInterruptedException) + { + Console.WriteLine("Interrupted"); + } + catch (ThreadAbortException) + { + Console.WriteLine("Aborted"); + } + } + }); + thread.Start(); + thread.SafeJoin(null, 1000); + } } } diff --git a/Capnp.Net.Runtime.Tests/LocalRpc.cs b/Capnp.Net.Runtime.Tests/LocalRpc.cs index 7517cd2..f77746a 100644 --- a/Capnp.Net.Runtime.Tests/LocalRpc.cs +++ b/Capnp.Net.Runtime.Tests/LocalRpc.cs @@ -195,7 +195,7 @@ namespace Capnp.Net.Runtime.Tests } [TestMethod] - public void EagerRace2() + public void AwaitNoDeadlock() { for (int i = 0; i < 100; i++) { diff --git a/Capnp.Net.Runtime/Assembly.cs b/Capnp.Net.Runtime/Assembly.cs new file mode 100644 index 0000000..114f6be --- /dev/null +++ b/Capnp.Net.Runtime/Assembly.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Capnp.Net.Runtime.Tests")] diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index 55771ad..f52d702 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -1,4 +1,5 @@ using Capnp.FrameTracing; +using Capnp.Util; using Microsoft.Extensions.Logging; using System; using System.IO; @@ -92,6 +93,10 @@ namespace Capnp.Rpc _pump.Run(); } + catch (ThreadInterruptedException) + { + Logger.LogError($"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}"); + } finally { State = ConnectionState.Down; @@ -186,10 +191,7 @@ namespace Capnp.Rpc Logger.LogError(e, "Failure disposing client"); } - if (_pumpThread != null && !_pumpThread.Join(500)) - { - Logger.LogError("Unable to join pump thread within timeout"); - } + _pumpThread?.SafeJoin(Logger); GC.SuppressFinalize(this); } diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 14fb148..150ddd5 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -1,4 +1,5 @@ using Capnp.FrameTracing; +using Capnp.Util; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; @@ -60,6 +61,8 @@ namespace Capnp.Rpc class Connection: IConnection { + ILogger Logger { get; } = Logging.CreateLogger(); + readonly List _tracers = new List(); readonly TcpRpcServer _server; Stream _stream; @@ -95,6 +98,10 @@ namespace Capnp.Rpc Pump.Run(); } + catch (ThreadInterruptedException) + { + Logger.LogError($"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}"); + } finally { OutboundEp.Dismiss(); @@ -199,6 +206,10 @@ namespace Capnp.Rpc // Listener was stopped. Maybe a little bit rude, but this is // our way of shutting down the acceptor thread. } + catch (ThreadInterruptedException) + { + Logger.LogError($"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}"); + } catch (System.Exception exception) { // Any other exception might be due to some other problem. @@ -227,7 +238,7 @@ namespace Capnp.Rpc { connection.Client.Dispose(); connection.Pump?.Dispose(); - connection.PumpRunner?.Join(5000); + connection.PumpRunner?.SafeJoin(Logger); } _rpcEngine.BootstrapCap = null; diff --git a/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs b/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs index 5cf1335..1565aa4 100644 --- a/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs +++ b/Capnp.Net.Runtime/Util/StrictlyOrderedAwaitTask.cs @@ -7,17 +7,21 @@ using System.Threading.Tasks; namespace Capnp.Util { - public class StrictlyOrderedAwaitTask: INotifyCompletion + internal class StrictlyOrderedAwaitTask: INotifyCompletion { - static readonly Action Capsule = () => throw new InvalidProgramException("Not invocable"); + class Cover { } + class Seal { } + + static readonly Cover s_cover = new Cover(); + static readonly Seal s_seal = new Seal(); readonly Task _awaitedTask; - Action? _state; + object? _state; public StrictlyOrderedAwaitTask(Task awaitedTask) { _awaitedTask = awaitedTask; - AwaitInternal(); + _state = s_cover; } public StrictlyOrderedAwaitTask GetAwaiter() @@ -41,41 +45,54 @@ namespace Capnp.Util Action? continuations; do { - continuations = Interlocked.Exchange(ref _state, null); + continuations = (Action?)Interlocked.Exchange(ref _state, null); continuations?.Invoke(); } while (continuations != null); - return Interlocked.CompareExchange(ref _state, Capsule, null) == null; + return Interlocked.CompareExchange(ref _state, s_seal, null) == null; }); } } public void OnCompleted(Action continuation) { + bool first = false; + SpinWait.SpinUntil(() => { - Action? cur, next; + object? cur, next; cur = Volatile.Read(ref _state); + first = false; switch (cur) { + case Cover cover: + next = continuation; + first = true; + break; + case null: next = continuation; break; - case Action capsule when capsule == Capsule: - continuation(); - return true; - case Action action: next = action + continuation; break; + + default: + continuation(); + return true; } return Interlocked.CompareExchange(ref _state, next, cur) == cur; }); + + if (first) + { + AwaitInternal(); + } } - public bool IsCompleted => _awaitedTask.IsCompleted && _state == Capsule; + public bool IsCompleted => _awaitedTask.IsCompleted && _state == s_seal; public T GetResult() => _awaitedTask.GetAwaiter().GetResult(); @@ -84,7 +101,7 @@ namespace Capnp.Util public Task WrappedTask => _awaitedTask; } - public static class StrictlyOrderedTaskExtensions + internal static class StrictlyOrderedTaskExtensions { public static StrictlyOrderedAwaitTask EnforceAwaitOrder(this Task task) { diff --git a/Capnp.Net.Runtime/Util/ThreadExtensions.cs b/Capnp.Net.Runtime/Util/ThreadExtensions.cs new file mode 100644 index 0000000..a53da32 --- /dev/null +++ b/Capnp.Net.Runtime/Util/ThreadExtensions.cs @@ -0,0 +1,49 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; + +namespace Capnp.Util +{ + internal static class ThreadExtensions + { + class ThreadExtensionsLoggingContext + { + public ILogger Logger { get; } = Logging.CreateLogger(); + } + + static Lazy LoggingContext = new Lazy( + () => new ThreadExtensionsLoggingContext(), + LazyThreadSafetyMode.PublicationOnly); + + public static void SafeJoin(this Thread thread, ILogger? logger = null, int timeout = 5000) + { + if (!thread.Join(timeout)) + { + logger ??= LoggingContext.Value.Logger; + + string name = thread.Name ?? thread.ManagedThreadId.ToString(); + + try + { + logger.LogError($"Unable to join thread {name}. Thread is in state {thread.ThreadState}."); + thread.Interrupt(); + if (!thread.Join(timeout / 10)) + { + logger.LogError($"Still unable to join thread {name} after Interrupt(). Thread is in state {thread.ThreadState}."); + thread.Abort(); + if (thread.Join(timeout / 10)) + { + logger.LogError($"Still unable to join thread {name} after Abort(). Thread is in state {thread.ThreadState}."); + } + } + } + catch + { + } + } + } + } +}