diff --git a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs index 20cd863..5043133 100644 --- a/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs +++ b/Capnp.Net.Runtime.Tests/TcpRpcInterop.cs @@ -1,4 +1,5 @@ -using Capnp.Net.Runtime.Tests.GenImpls; +using Capnp.FrameTracing; +using Capnp.Net.Runtime.Tests.GenImpls; using Capnp.Rpc; using Capnproto_test.Capnp.Test; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -364,13 +365,18 @@ namespace Capnp.Net.Runtime.Tests } } - [TestMethod, Timeout(10000)] + [TestMethod] public void TestTailCallClient() { LaunchCompatTestProcess("server:TailCaller", stdout => { - using (var client = new TcpRpcClient("localhost", TcpPort)) + using (var client = new TcpRpcClient()) { + var tracer = new RpcFrameTracer(Console.Out); + + client.AttachTracer(tracer); + client.Connect("localhost", TcpPort); + client.WhenConnected.Wait(); using (var main = client.GetMain()) @@ -399,11 +405,21 @@ namespace Capnp.Net.Runtime.Tests }); } - [TestMethod, Timeout(10000), Ignore] + [TestMethod, Ignore] + // For details on why this test is ignored, see https://github.com/capnproto/capnproto/issues/876 public void TestTailCallServer() { + using (var server = SetupServer()) { + var tracer = new RpcFrameTracer(Console.Out); + + server.OnConnectionChanged += (s, a) => + { + if (a.Connection.State == ConnectionState.Initializing) + a.Connection.AttachTracer(tracer); + }; + var counters = new Counters(); server.Main = new TestTailCallerImpl(counters); diff --git a/Capnp.Net.Runtime/FramePump.cs b/Capnp.Net.Runtime/FramePump.cs index dcfc154..c882fd7 100644 --- a/Capnp.Net.Runtime/FramePump.cs +++ b/Capnp.Net.Runtime/FramePump.cs @@ -1,12 +1,13 @@ -using Microsoft.Extensions.Logging; +using Capnp.FrameTracing; +using Microsoft.Extensions.Logging; using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; -using System.Threading.Tasks; namespace Capnp { @@ -23,6 +24,7 @@ namespace Capnp readonly Stream _stream; readonly BinaryWriter _writer; readonly object _writeLock = new object(); + readonly List _tracers = new List(); /// /// Constructs a new instance for given stream. @@ -47,6 +49,11 @@ namespace Capnp { if (0 == Interlocked.Exchange(ref _disposing, 1)) { + foreach (var tracer in _tracers) + { + tracer.Dispose(); + } + _writer?.Dispose(); _stream.Dispose(); } @@ -84,6 +91,11 @@ namespace Capnp lock (_writeLock) { + foreach (var tracer in _tracers) + { + tracer.TraceFrame(FrameDirection.Tx, frame); + } + _writer.Write(frame.Segments.Count - 1); foreach (var segment in frame.Segments) @@ -140,6 +152,10 @@ namespace Capnp IsWaitingForData = true; var frame = reader.ReadWireFrame(); IsWaitingForData = false; + foreach (var tracer in _tracers) + { + tracer.TraceFrame(FrameDirection.Rx, frame); + } FrameReceived?.Invoke(frame); } } @@ -169,5 +185,10 @@ namespace Capnp IsWaitingForData = false; } } + + public void AttachTracer(IFrameTracer tracer) + { + _tracers.Add(tracer); + } } } diff --git a/Capnp.Net.Runtime/FrameTracing/IFrameTracer.cs b/Capnp.Net.Runtime/FrameTracing/IFrameTracer.cs new file mode 100644 index 0000000..115a6f1 --- /dev/null +++ b/Capnp.Net.Runtime/FrameTracing/IFrameTracer.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Capnp.FrameTracing +{ + public enum FrameDirection + { + Rx, + Tx + } + + public interface IFrameTracer: IDisposable + { + void TraceFrame(FrameDirection direction, WireFrame frame); + } +} diff --git a/Capnp.Net.Runtime/FrameTracing/RpcFrameTracer.cs b/Capnp.Net.Runtime/FrameTracing/RpcFrameTracer.cs new file mode 100644 index 0000000..83e1b50 --- /dev/null +++ b/Capnp.Net.Runtime/FrameTracing/RpcFrameTracer.cs @@ -0,0 +1,232 @@ +using Capnp.Rpc; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading; + +namespace Capnp.FrameTracing +{ + public class RpcFrameTracer : IFrameTracer + { + const string Header = "Ticks | Thread | Dir | Message"; + static readonly string HeaderSpace = new string(Enumerable.Repeat(' ', 30).ToArray()) + "|"; + + readonly Stopwatch _timer = new Stopwatch(); + readonly TextWriter _traceWriter; + + public RpcFrameTracer(TextWriter traceWriter) + { + _traceWriter = traceWriter ?? throw new ArgumentNullException(nameof(traceWriter)); + _traceWriter.WriteLine(Header); + } + + public void Dispose() + { + _traceWriter.WriteLine(""); + _traceWriter.Dispose(); + } + + void RenderMessageTarget(MessageTarget.READER target, FrameDirection dir) + { + string tag; + + switch (target.which) + { + case MessageTarget.WHICH.ImportedCap: + tag = dir == FrameDirection.Tx ? "CR" : "CL"; + _traceWriter.WriteLine($"on imported cap {tag}{target.ImportedCap}"); + break; + + case MessageTarget.WHICH.PromisedAnswer: + tag = dir == FrameDirection.Tx ? "Q" : "A"; + _traceWriter.Write($"on promised answer {tag}{target.PromisedAnswer.QuestionId}"); + if (target.PromisedAnswer.Transform != null && target.PromisedAnswer.Transform.Count > 0) + { + _traceWriter.Write(": "); + _traceWriter.Write(string.Join(".", target.PromisedAnswer.Transform.Select(t => t.GetPointerField))); + } + _traceWriter.WriteLine(); + break; + } + } + + void RenderCapDescriptor(CapDescriptor.READER desc, FrameDirection dir) + { + string tag; + + _traceWriter.Write($" {desc.which, 14}"); + switch (desc.which) + { + case CapDescriptor.WHICH.ReceiverAnswer: + tag = dir == FrameDirection.Tx ? "Q" : "A"; + _traceWriter.Write($" {tag}{desc.ReceiverAnswer}"); + break; + + case CapDescriptor.WHICH.ReceiverHosted: + tag = dir == FrameDirection.Tx ? "CR" : "CL"; + _traceWriter.Write($" {tag}{desc.ReceiverHosted}"); + break; + + case CapDescriptor.WHICH.SenderPromise: + tag = dir == FrameDirection.Tx ? "CL" : "CR"; + _traceWriter.Write($" {tag}{desc.SenderPromise}"); + break; + + case CapDescriptor.WHICH.SenderHosted: + tag = dir == FrameDirection.Tx ? "CL" : "CR"; + _traceWriter.Write($" {tag}{desc.SenderHosted}"); + break; + } + } + + void RenderCapTable(IEnumerable caps, FrameDirection dir) + { + foreach (var cap in caps) + { + _traceWriter.Write(HeaderSpace); + RenderCapDescriptor(cap, dir); + _traceWriter.WriteLine(); + } + } + + public void TraceFrame(FrameDirection dir, WireFrame frame) + { + if (!_timer.IsRunning) + { + _timer.Start(); + } + + _traceWriter.Write($@"{_timer.ElapsedTicks, 10} | {Thread.CurrentThread.ManagedThreadId, 10} | "); + _traceWriter.Write(dir == FrameDirection.Tx ? "Tx |" : "Rx |"); + + var dec = DeserializerState.CreateRoot(frame); + var msg = Message.READER.create(dec); + string tag; + + switch (msg.which) + { + case Message.WHICH.Abort: + _traceWriter.WriteLine($"ABORT {msg.Abort.Reason}"); + break; + + case Message.WHICH.Bootstrap: + tag = dir == FrameDirection.Tx ? "Q" : "A"; + _traceWriter.WriteLine($"BOOTSTRAP {tag}{msg.Bootstrap.QuestionId}"); + break; + + case Message.WHICH.Call: + tag = dir == FrameDirection.Tx ? "Q" : "A"; + _traceWriter.Write($"CALL {tag}{msg.Call.QuestionId}, I: {msg.Call.InterfaceId:x} M: {msg.Call.MethodId} "); + RenderMessageTarget(msg.Call.Target, dir); + _traceWriter.Write(HeaderSpace); + _traceWriter.WriteLine($"Send results to {msg.Call.SendResultsTo.which}"); + RenderCapTable(msg.Call.Params.CapTable, dir); + break; + + case Message.WHICH.Disembargo: + _traceWriter.Write($"DISEMBARGO {msg.Disembargo.Context.which}"); + switch (msg.Disembargo.Context.which) + { + case Disembargo.context.WHICH.Provide: + _traceWriter.Write($" {msg.Disembargo.Context.Provide}"); + break; + + case Disembargo.context.WHICH.ReceiverLoopback: + _traceWriter.Write($" E{msg.Disembargo.Context.ReceiverLoopback}"); + break; + + case Disembargo.context.WHICH.SenderLoopback: + _traceWriter.Write($" E{msg.Disembargo.Context.SenderLoopback}"); + break; + } + _traceWriter.WriteLine("."); + _traceWriter.Write(HeaderSpace); + RenderMessageTarget(msg.Disembargo.Target, dir); + break; + + case Message.WHICH.Finish: + tag = dir == FrameDirection.Tx ? "Q" : "A"; + _traceWriter.WriteLine($"FINISH {tag}{msg.Finish.QuestionId}, release: {msg.Finish.ReleaseResultCaps}"); + break; + + case Message.WHICH.Release: + tag = dir == FrameDirection.Tx ? "CR" : "CL"; + _traceWriter.WriteLine($"RELEASE {tag}{msg.Release.Id}, count: {msg.Release.ReferenceCount}"); + break; + + case Message.WHICH.Resolve: + tag = dir == FrameDirection.Tx ? "CL" : "CR"; + _traceWriter.Write($"RESOLVE {tag}{msg.Resolve.PromiseId}: {msg.Resolve.which}"); + switch (msg.Resolve.which) + { + case Resolve.WHICH.Cap: + RenderCapDescriptor(msg.Resolve.Cap, dir); + _traceWriter.WriteLine("."); + break; + + case Resolve.WHICH.Exception: + _traceWriter.WriteLine($" {msg.Resolve.Exception.Reason}"); + break; + } + break; + + case Message.WHICH.Return: + tag = dir == FrameDirection.Tx ? "A" : "Q"; + _traceWriter.Write($"RETURN {tag}{msg.Return.AnswerId} {msg.Return.which}"); + switch (msg.Return.which) + { + case Return.WHICH.Exception: + _traceWriter.WriteLine($" {msg.Return.Exception.Reason}"); + break; + + case Return.WHICH.Results: + _traceWriter.WriteLine($", release: {msg.Return.ReleaseParamCaps}"); + RenderCapTable(msg.Return.Results.CapTable, dir); + break; + + case Return.WHICH.TakeFromOtherQuestion: + tag = dir == FrameDirection.Tx ? "Q" : "A"; + _traceWriter.WriteLine($" {tag}{msg.Return.TakeFromOtherQuestion}"); + break; + + default: + _traceWriter.WriteLine(); + break; + } + break; + + case Message.WHICH.Unimplemented: + _traceWriter.WriteLine($"UNIMPLEMENTED {msg.Unimplemented.which}"); + break; + + case Message.WHICH.Accept: + _traceWriter.WriteLine("ACCEPT"); + break; + + case Message.WHICH.Join: + _traceWriter.WriteLine("JOIN"); + break; + + case Message.WHICH.Provide: + _traceWriter.WriteLine($"PROVIDE {msg.Provide.QuestionId}"); + RenderMessageTarget(msg.Provide.Target, dir); + break; + + case Message.WHICH.ObsoleteDelete: + _traceWriter.WriteLine("OBSOLETEDELETE"); + break; + + case Message.WHICH.ObsoleteSave: + _traceWriter.WriteLine("OBSOLETESAVE"); + break; + + default: + _traceWriter.WriteLine($"Unknown message {msg.which}"); + break; + + } + } + } +} diff --git a/Capnp.Net.Runtime/Rpc/ConnectionState.cs b/Capnp.Net.Runtime/Rpc/ConnectionState.cs new file mode 100644 index 0000000..5084a54 --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/ConnectionState.cs @@ -0,0 +1,24 @@ +namespace Capnp.Rpc +{ + /// + /// State of an RPC connection + /// + public enum ConnectionState + { + /// + /// Connection is being initialized. This is a transient state. For TcpRpcServer it is active during + /// the OnConnectionChanged event callback. For TcpRpcClient it is active before the connection is established. + /// + Initializing, + + /// + /// Connection is active. + /// + Active, + + /// + /// Connection is down. It will never be active again (re-connecting means to establish a new connection). + /// + Down + } +} diff --git a/Capnp.Net.Runtime/Rpc/IConnection.cs b/Capnp.Net.Runtime/Rpc/IConnection.cs new file mode 100644 index 0000000..582b17e --- /dev/null +++ b/Capnp.Net.Runtime/Rpc/IConnection.cs @@ -0,0 +1,60 @@ +using Capnp.FrameTracing; +using System; + +namespace Capnp.Rpc +{ + /// + /// Models an RPC connection. + /// + public interface IConnection + { + /// + /// Returns the state of this connection. + /// + ConnectionState State { get; } + + /// + /// TCP port (local end), or null if the connection is not yet established. + /// + int? LocalPort { get; } + + /// + /// TCP port (remote end), or null if the connection is not yet established. + /// + int? RemotePort { get; } + + /// + /// Receive message counter + /// + long RecvCount { get; } + + /// + /// Sent message counter + /// + long SendCount { get; } + + /// + /// Whether the RPC engine is currently computing. + /// + bool IsComputing { get; } + + /// + /// Whether the connection is idle, waiting for data to receive. + /// + bool IsWaitingForData { get; } + + /// + /// Attaches a tracer to this connection. Only allowed in state 'Initializing'. + /// + /// Tracer to attach + /// is null + /// Connection is not in state 'Initializing' + void AttachTracer(IFrameTracer tracer); + + /// + /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case + /// of this method is to refuse an incoming connection in the TcpRpcServer.OnConnectionChanged callback. + /// + void Close(); + } +} diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index cf9c0ab..0f3e1a6 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Logging; +using Capnp.FrameTracing; +using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; @@ -15,7 +16,7 @@ namespace Capnp.Rpc /// TCP-based RPC implementation which will establish a connection to a TCP server implementing /// the Cap'n Proto RPC protocol. /// - public class TcpRpcClient: IDisposable + public class TcpRpcClient: IConnection, IDisposable { ILogger Logger { get; } = Logging.CreateLogger(); @@ -47,11 +48,13 @@ namespace Capnp.Rpc OutboundTcpEndpoint _outboundEndpoint; FramePump _pump; Thread _pumpThread; + Action _attachTracerAction; /// - /// Gets a Task which completes when TCP is connected. + /// Gets a Task which completes when TCP is connected. Will be + /// null until connection is actually requested (either by calling Connect or using appropriate constructor). /// - public Task WhenConnected { get; } + public Task WhenConnected { get; private set; } async Task ConnectAsync(string host, int port) { @@ -74,11 +77,13 @@ namespace Capnp.Rpc } } - async Task Connect(string host, int port) + async Task ConnectAndRunAsync(string host, int port) { await ConnectAsync(host, port); + State = ConnectionState.Active; _pump = new FramePump(_client.GetStream()); + _attachTracerAction?.Invoke(); _outboundEndpoint = new OutboundTcpEndpoint(this, _pump); _inboundEndpoint = _rpcEngine.AddEndpoint(_outboundEndpoint); _pumpThread = new Thread(() => @@ -91,6 +96,7 @@ namespace Capnp.Rpc } finally { + State = ConnectionState.Down; _outboundEndpoint.Dismiss(); _inboundEndpoint.Dismiss(); _pump.Dispose(); @@ -109,13 +115,36 @@ namespace Capnp.Rpc /// is null. /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. /// An error occurred when accessing the socket. - public TcpRpcClient(string host, int port) + public TcpRpcClient(string host, int port): this() + { + Connect(host, port); + } + + /// + /// Constructs an instance but does not yet attempt to connect. + /// + public TcpRpcClient() { _rpcEngine = new RpcEngine(); _client = new TcpClient(); _client.ExclusiveAddressUse = false; + } - WhenConnected = Connect(host, port); + /// + /// Attempts to connect it to given host. + /// + /// The DNS name of the remote RPC host + /// The port number of the remote RPC host + /// is null. + /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. + /// An error occurred when accessing the socket. + /// Connection was already requested + public void Connect(string host, int port) + { + if (WhenConnected != null) + throw new InvalidOperationException("Connection was already requested"); + + WhenConnected = ConnectAndRunAsync(host, port); } /// @@ -167,6 +196,41 @@ namespace Capnp.Rpc GC.SuppressFinalize(this); } + /// + /// Attaches a tracer to this connection. Only allowed in state 'Initializing'. To avoid race conditions, + /// this method should only be used in conjunction with the parameterless constructor (no auto-connect). + /// Call this method *before* calling Connect. + /// + /// Tracer to attach + /// is null + /// Connection is not in state 'Initializing' + public void AttachTracer(IFrameTracer tracer) + { + if (tracer == null) + throw new ArgumentNullException(nameof(tracer)); + + if (State != ConnectionState.Initializing) + throw new InvalidOperationException("Connection is not in state 'Initializing'"); + + _attachTracerAction += () => + { + _pump.AttachTracer(tracer); + }; + } + + /// + /// Prematurely closes this connection. Note that there is usually no need to close a connection manually. + /// + void IConnection.Close() + { + _client.Dispose(); + } + + /// + /// Returns the state of this connection. + /// + public ConnectionState State { get; private set; } = ConnectionState.Initializing; + /// /// Gets the number of RPC protocol messages sent by this client so far. /// @@ -183,6 +247,12 @@ namespace Capnp.Rpc /// public int? RemotePort => ((IPEndPoint)_client.Client?.RemoteEndPoint)?.Port; + /// + /// Gets the local port number which this client using, + /// or null if the connection is not yet established. + /// + public int? LocalPort => ((IPEndPoint)_client.Client?.LocalEndPoint)?.Port; + /// /// Whether the I/O thread is currently running /// diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 3679a71..5b9b816 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Logging; +using Capnp.FrameTracing; +using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Net; @@ -9,42 +10,21 @@ using System.Threading.Tasks; namespace Capnp.Rpc { + public class ConnectionEventArgs: EventArgs + { + public IConnection Connection { get; } + + public ConnectionEventArgs(IConnection connection) + { + Connection = connection; + } + } + /// /// Cap'n Proto RPC TCP server. /// public class TcpRpcServer: IDisposable { - /// - /// Models an incoming connection. - /// - public interface IConnection - { - /// - /// Server-side port - /// - int LocalPort { get; } - - /// - /// Receive message counter - /// - long RecvCount { get; } - - /// - /// Sent message counter - /// - long SendCount { get; } - - /// - /// Whether the RPC engine is currently computing. - /// - bool IsComputing { get; } - - /// - /// Whether the connection is idle, waiting for data to receive. - /// - bool IsWaitingForData { get; } - } - ILogger Logger { get; } = Logging.CreateLogger(); class OutboundTcpEndpoint : IEndpoint @@ -71,18 +51,25 @@ namespace Capnp.Rpc class Connection: IConnection { + readonly TcpRpcServer _server; + public Connection(TcpRpcServer server, TcpClient client, FramePump pump, OutboundTcpEndpoint outboundEp, RpcEngine.RpcEndpoint inboundEp) { + _server = server; Client = client; Pump = pump; OutboundEp = outboundEp; InboundEp = inboundEp; + } + public void Start() + { PumpRunner = new Thread(o => { try { Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}"; + State = ConnectionState.Active; Pump.Run(); } @@ -91,25 +78,46 @@ namespace Capnp.Rpc OutboundEp.Dismiss(); InboundEp.Dismiss(); Pump.Dispose(); - lock (server._reentrancyBlocker) + Client.Dispose(); + lock (_server._reentrancyBlocker) { - --server.ConnectionCount; - server._connections.Remove(this); + --_server.ConnectionCount; + _server._connections.Remove(this); + State = ConnectionState.Down; + _server.OnConnectionChanged?.Invoke(_server, new ConnectionEventArgs(this)); } } }); } + public ConnectionState State { get; set; } = ConnectionState.Initializing; public TcpClient Client { get; private set; } public FramePump Pump { get; private set; } public OutboundTcpEndpoint OutboundEp { get; private set; } public RpcEngine.RpcEndpoint InboundEp { get; private set; } public Thread PumpRunner { get; private set; } - public int LocalPort => ((IPEndPoint)Client.Client.LocalEndPoint).Port; + public int? LocalPort => ((IPEndPoint)Client.Client.LocalEndPoint)?.Port; + public int? RemotePort => ((IPEndPoint)Client.Client.RemoteEndPoint)?.Port; public long RecvCount => InboundEp.RecvCount; public long SendCount => InboundEp.SendCount; public bool IsComputing => PumpRunner.ThreadState == ThreadState.Running; public bool IsWaitingForData => Pump.IsWaitingForData; + + public void AttachTracer(IFrameTracer tracer) + { + if (tracer == null) + throw new ArgumentNullException(nameof(tracer)); + + if (State != ConnectionState.Initializing) + throw new InvalidOperationException("Connection is not in state 'Initializing'"); + + Pump.AttachTracer(tracer); + } + + public void Close() + { + Client.Dispose(); + } } readonly RpcEngine _rpcEngine; @@ -144,6 +152,9 @@ namespace Capnp.Rpc { ++ConnectionCount; _connections.Add(connection); + + OnConnectionChanged?.Invoke(this, new ConnectionEventArgs(connection)); + connection.Start(); } connection.PumpRunner.Start(); @@ -193,13 +204,7 @@ namespace Capnp.Rpc /// public void Dispose() { - try - { - _listener.Stop(); - } - catch (SocketException) - { - } + StopListening(); var connections = new List(); @@ -220,6 +225,20 @@ namespace Capnp.Rpc GC.SuppressFinalize(this); } + /// + /// Stops accepting incoming attempts. + /// + public void StopListening() + { + try + { + _listener.Stop(); + } + catch (SocketException) + { + } + } + /// /// Constructs an instance. /// @@ -282,5 +301,10 @@ namespace Capnp.Rpc } } } + + /// + /// Fires when a new incoming connection was accepted, or when an active connection is closed. + /// + public event Action OnConnectionChanged; } }