Merge pull request #18 from c80k/frame-tracing

Frame tracing
This commit is contained in:
c80k 2019-10-04 00:21:51 +02:00 committed by GitHub
commit 271dc3f380
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 520 additions and 56 deletions

View File

@ -1,4 +1,5 @@
using Capnp.Net.Runtime.Tests.GenImpls;
using Capnp.FrameTracing;
using Capnp.Net.Runtime.Tests.GenImpls;
using Capnp.Rpc;
using Capnproto_test.Capnp.Test;
using Microsoft.VisualStudio.TestTools.UnitTesting;
@ -364,13 +365,18 @@ namespace Capnp.Net.Runtime.Tests
}
}
[TestMethod, Timeout(10000)]
[TestMethod]
public void TestTailCallClient()
{
LaunchCompatTestProcess("server:TailCaller", stdout =>
{
using (var client = new TcpRpcClient("localhost", TcpPort))
using (var client = new TcpRpcClient())
{
var tracer = new RpcFrameTracer(Console.Out);
client.AttachTracer(tracer);
client.Connect("localhost", TcpPort);
client.WhenConnected.Wait();
using (var main = client.GetMain<ITestTailCaller>())
@ -399,11 +405,21 @@ namespace Capnp.Net.Runtime.Tests
});
}
[TestMethod, Timeout(10000), Ignore]
[TestMethod, Ignore]
// For details on why this test is ignored, see https://github.com/capnproto/capnproto/issues/876
public void TestTailCallServer()
{
using (var server = SetupServer())
{
var tracer = new RpcFrameTracer(Console.Out);
server.OnConnectionChanged += (s, a) =>
{
if (a.Connection.State == ConnectionState.Initializing)
a.Connection.AttachTracer(tracer);
};
var counters = new Counters();
server.Main = new TestTailCallerImpl(counters);

View File

@ -1,12 +1,13 @@
using Microsoft.Extensions.Logging;
using Capnp.FrameTracing;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Capnp
{
@ -23,6 +24,7 @@ namespace Capnp
readonly Stream _stream;
readonly BinaryWriter _writer;
readonly object _writeLock = new object();
readonly List<IFrameTracer> _tracers = new List<IFrameTracer>();
/// <summary>
/// Constructs a new instance for given stream.
@ -47,6 +49,11 @@ namespace Capnp
{
if (0 == Interlocked.Exchange(ref _disposing, 1))
{
foreach (var tracer in _tracers)
{
tracer.Dispose();
}
_writer?.Dispose();
_stream.Dispose();
}
@ -84,6 +91,11 @@ namespace Capnp
lock (_writeLock)
{
foreach (var tracer in _tracers)
{
tracer.TraceFrame(FrameDirection.Tx, frame);
}
_writer.Write(frame.Segments.Count - 1);
foreach (var segment in frame.Segments)
@ -140,6 +152,10 @@ namespace Capnp
IsWaitingForData = true;
var frame = reader.ReadWireFrame();
IsWaitingForData = false;
foreach (var tracer in _tracers)
{
tracer.TraceFrame(FrameDirection.Rx, frame);
}
FrameReceived?.Invoke(frame);
}
}
@ -169,5 +185,10 @@ namespace Capnp
IsWaitingForData = false;
}
}
public void AttachTracer(IFrameTracer tracer)
{
_tracers.Add(tracer);
}
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Capnp.FrameTracing
{
public enum FrameDirection
{
Rx,
Tx
}
public interface IFrameTracer: IDisposable
{
void TraceFrame(FrameDirection direction, WireFrame frame);
}
}

View File

@ -0,0 +1,232 @@
using Capnp.Rpc;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
namespace Capnp.FrameTracing
{
public class RpcFrameTracer : IFrameTracer
{
const string Header = "Ticks | Thread | Dir | Message";
static readonly string HeaderSpace = new string(Enumerable.Repeat(' ', 30).ToArray()) + "|";
readonly Stopwatch _timer = new Stopwatch();
readonly TextWriter _traceWriter;
public RpcFrameTracer(TextWriter traceWriter)
{
_traceWriter = traceWriter ?? throw new ArgumentNullException(nameof(traceWriter));
_traceWriter.WriteLine(Header);
}
public void Dispose()
{
_traceWriter.WriteLine("<end of trace>");
_traceWriter.Dispose();
}
void RenderMessageTarget(MessageTarget.READER target, FrameDirection dir)
{
string tag;
switch (target.which)
{
case MessageTarget.WHICH.ImportedCap:
tag = dir == FrameDirection.Tx ? "CR" : "CL";
_traceWriter.WriteLine($"on imported cap {tag}{target.ImportedCap}");
break;
case MessageTarget.WHICH.PromisedAnswer:
tag = dir == FrameDirection.Tx ? "Q" : "A";
_traceWriter.Write($"on promised answer {tag}{target.PromisedAnswer.QuestionId}");
if (target.PromisedAnswer.Transform != null && target.PromisedAnswer.Transform.Count > 0)
{
_traceWriter.Write(": ");
_traceWriter.Write(string.Join(".", target.PromisedAnswer.Transform.Select(t => t.GetPointerField)));
}
_traceWriter.WriteLine();
break;
}
}
void RenderCapDescriptor(CapDescriptor.READER desc, FrameDirection dir)
{
string tag;
_traceWriter.Write($" {desc.which, 14}");
switch (desc.which)
{
case CapDescriptor.WHICH.ReceiverAnswer:
tag = dir == FrameDirection.Tx ? "Q" : "A";
_traceWriter.Write($" {tag}{desc.ReceiverAnswer}");
break;
case CapDescriptor.WHICH.ReceiverHosted:
tag = dir == FrameDirection.Tx ? "CR" : "CL";
_traceWriter.Write($" {tag}{desc.ReceiverHosted}");
break;
case CapDescriptor.WHICH.SenderPromise:
tag = dir == FrameDirection.Tx ? "CL" : "CR";
_traceWriter.Write($" {tag}{desc.SenderPromise}");
break;
case CapDescriptor.WHICH.SenderHosted:
tag = dir == FrameDirection.Tx ? "CL" : "CR";
_traceWriter.Write($" {tag}{desc.SenderHosted}");
break;
}
}
void RenderCapTable(IEnumerable<CapDescriptor.READER> caps, FrameDirection dir)
{
foreach (var cap in caps)
{
_traceWriter.Write(HeaderSpace);
RenderCapDescriptor(cap, dir);
_traceWriter.WriteLine();
}
}
public void TraceFrame(FrameDirection dir, WireFrame frame)
{
if (!_timer.IsRunning)
{
_timer.Start();
}
_traceWriter.Write($@"{_timer.ElapsedTicks, 10} | {Thread.CurrentThread.ManagedThreadId, 10} | ");
_traceWriter.Write(dir == FrameDirection.Tx ? "Tx |" : "Rx |");
var dec = DeserializerState.CreateRoot(frame);
var msg = Message.READER.create(dec);
string tag;
switch (msg.which)
{
case Message.WHICH.Abort:
_traceWriter.WriteLine($"ABORT {msg.Abort.Reason}");
break;
case Message.WHICH.Bootstrap:
tag = dir == FrameDirection.Tx ? "Q" : "A";
_traceWriter.WriteLine($"BOOTSTRAP {tag}{msg.Bootstrap.QuestionId}");
break;
case Message.WHICH.Call:
tag = dir == FrameDirection.Tx ? "Q" : "A";
_traceWriter.Write($"CALL {tag}{msg.Call.QuestionId}, I: {msg.Call.InterfaceId:x} M: {msg.Call.MethodId} ");
RenderMessageTarget(msg.Call.Target, dir);
_traceWriter.Write(HeaderSpace);
_traceWriter.WriteLine($"Send results to {msg.Call.SendResultsTo.which}");
RenderCapTable(msg.Call.Params.CapTable, dir);
break;
case Message.WHICH.Disembargo:
_traceWriter.Write($"DISEMBARGO {msg.Disembargo.Context.which}");
switch (msg.Disembargo.Context.which)
{
case Disembargo.context.WHICH.Provide:
_traceWriter.Write($" {msg.Disembargo.Context.Provide}");
break;
case Disembargo.context.WHICH.ReceiverLoopback:
_traceWriter.Write($" E{msg.Disembargo.Context.ReceiverLoopback}");
break;
case Disembargo.context.WHICH.SenderLoopback:
_traceWriter.Write($" E{msg.Disembargo.Context.SenderLoopback}");
break;
}
_traceWriter.WriteLine(".");
_traceWriter.Write(HeaderSpace);
RenderMessageTarget(msg.Disembargo.Target, dir);
break;
case Message.WHICH.Finish:
tag = dir == FrameDirection.Tx ? "Q" : "A";
_traceWriter.WriteLine($"FINISH {tag}{msg.Finish.QuestionId}, release: {msg.Finish.ReleaseResultCaps}");
break;
case Message.WHICH.Release:
tag = dir == FrameDirection.Tx ? "CR" : "CL";
_traceWriter.WriteLine($"RELEASE {tag}{msg.Release.Id}, count: {msg.Release.ReferenceCount}");
break;
case Message.WHICH.Resolve:
tag = dir == FrameDirection.Tx ? "CL" : "CR";
_traceWriter.Write($"RESOLVE {tag}{msg.Resolve.PromiseId}: {msg.Resolve.which}");
switch (msg.Resolve.which)
{
case Resolve.WHICH.Cap:
RenderCapDescriptor(msg.Resolve.Cap, dir);
_traceWriter.WriteLine(".");
break;
case Resolve.WHICH.Exception:
_traceWriter.WriteLine($" {msg.Resolve.Exception.Reason}");
break;
}
break;
case Message.WHICH.Return:
tag = dir == FrameDirection.Tx ? "A" : "Q";
_traceWriter.Write($"RETURN {tag}{msg.Return.AnswerId} {msg.Return.which}");
switch (msg.Return.which)
{
case Return.WHICH.Exception:
_traceWriter.WriteLine($" {msg.Return.Exception.Reason}");
break;
case Return.WHICH.Results:
_traceWriter.WriteLine($", release: {msg.Return.ReleaseParamCaps}");
RenderCapTable(msg.Return.Results.CapTable, dir);
break;
case Return.WHICH.TakeFromOtherQuestion:
tag = dir == FrameDirection.Tx ? "Q" : "A";
_traceWriter.WriteLine($" {tag}{msg.Return.TakeFromOtherQuestion}");
break;
default:
_traceWriter.WriteLine();
break;
}
break;
case Message.WHICH.Unimplemented:
_traceWriter.WriteLine($"UNIMPLEMENTED {msg.Unimplemented.which}");
break;
case Message.WHICH.Accept:
_traceWriter.WriteLine("ACCEPT");
break;
case Message.WHICH.Join:
_traceWriter.WriteLine("JOIN");
break;
case Message.WHICH.Provide:
_traceWriter.WriteLine($"PROVIDE {msg.Provide.QuestionId}");
RenderMessageTarget(msg.Provide.Target, dir);
break;
case Message.WHICH.ObsoleteDelete:
_traceWriter.WriteLine("OBSOLETEDELETE");
break;
case Message.WHICH.ObsoleteSave:
_traceWriter.WriteLine("OBSOLETESAVE");
break;
default:
_traceWriter.WriteLine($"Unknown message {msg.which}");
break;
}
}
}
}

View File

@ -0,0 +1,24 @@
namespace Capnp.Rpc
{
/// <summary>
/// State of an RPC connection
/// </summary>
public enum ConnectionState
{
/// <summary>
/// Connection is being initialized. This is a transient state. For TcpRpcServer it is active during
/// the OnConnectionChanged event callback. For TcpRpcClient it is active before the connection is established.
/// </summary>
Initializing,
/// <summary>
/// Connection is active.
/// </summary>
Active,
/// <summary>
/// Connection is down. It will never be active again (re-connecting means to establish a new connection).
/// </summary>
Down
}
}

View File

@ -0,0 +1,60 @@
using Capnp.FrameTracing;
using System;
namespace Capnp.Rpc
{
/// <summary>
/// Models an RPC connection.
/// </summary>
public interface IConnection
{
/// <summary>
/// Returns the state of this connection.
/// </summary>
ConnectionState State { get; }
/// <summary>
/// TCP port (local end), or null if the connection is not yet established.
/// </summary>
int? LocalPort { get; }
/// <summary>
/// TCP port (remote end), or null if the connection is not yet established.
/// </summary>
int? RemotePort { get; }
/// <summary>
/// Receive message counter
/// </summary>
long RecvCount { get; }
/// <summary>
/// Sent message counter
/// </summary>
long SendCount { get; }
/// <summary>
/// Whether the RPC engine is currently computing.
/// </summary>
bool IsComputing { get; }
/// <summary>
/// Whether the connection is idle, waiting for data to receive.
/// </summary>
bool IsWaitingForData { get; }
/// <summary>
/// Attaches a tracer to this connection. Only allowed in state 'Initializing'.
/// </summary>
/// <param name="tracer">Tracer to attach</param>
/// <exception cref="ArgumentNullException"><paramref name="tracer"/> is null</exception>
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
void AttachTracer(IFrameTracer tracer);
/// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case
/// of this method is to refuse an incoming connection in the <code>TcpRpcServer.OnConnectionChanged</code> callback.
/// </summary>
void Close();
}
}

View File

@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Capnp.FrameTracing;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -15,7 +16,7 @@ namespace Capnp.Rpc
/// TCP-based RPC implementation which will establish a connection to a TCP server implementing
/// the Cap'n Proto RPC protocol.
/// </summary>
public class TcpRpcClient: IDisposable
public class TcpRpcClient: IConnection, IDisposable
{
ILogger Logger { get; } = Logging.CreateLogger<TcpRpcClient>();
@ -47,11 +48,13 @@ namespace Capnp.Rpc
OutboundTcpEndpoint _outboundEndpoint;
FramePump _pump;
Thread _pumpThread;
Action _attachTracerAction;
/// <summary>
/// Gets a Task which completes when TCP is connected.
/// Gets a Task which completes when TCP is connected. Will be
/// null until connection is actually requested (either by calling Connect or using appropriate constructor).
/// </summary>
public Task WhenConnected { get; }
public Task WhenConnected { get; private set; }
async Task ConnectAsync(string host, int port)
{
@ -74,11 +77,13 @@ namespace Capnp.Rpc
}
}
async Task Connect(string host, int port)
async Task ConnectAndRunAsync(string host, int port)
{
await ConnectAsync(host, port);
State = ConnectionState.Active;
_pump = new FramePump(_client.GetStream());
_attachTracerAction?.Invoke();
_outboundEndpoint = new OutboundTcpEndpoint(this, _pump);
_inboundEndpoint = _rpcEngine.AddEndpoint(_outboundEndpoint);
_pumpThread = new Thread(() =>
@ -91,6 +96,7 @@ namespace Capnp.Rpc
}
finally
{
State = ConnectionState.Down;
_outboundEndpoint.Dismiss();
_inboundEndpoint.Dismiss();
_pump.Dispose();
@ -109,13 +115,36 @@ namespace Capnp.Rpc
/// <exception cref="ArgumentNullException"><paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="System.Net.Sockets.SocketException">An error occurred when accessing the socket.</exception>
public TcpRpcClient(string host, int port)
public TcpRpcClient(string host, int port): this()
{
Connect(host, port);
}
/// <summary>
/// Constructs an instance but does not yet attempt to connect.
/// </summary>
public TcpRpcClient()
{
_rpcEngine = new RpcEngine();
_client = new TcpClient();
_client.ExclusiveAddressUse = false;
}
WhenConnected = Connect(host, port);
/// <summary>
/// Attempts to connect it to given host.
/// </summary>
/// <param name="host">The DNS name of the remote RPC host</param>
/// <param name="port">The port number of the remote RPC host</param>
/// <exception cref="ArgumentNullException"><paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="System.Net.Sockets.SocketException">An error occurred when accessing the socket.</exception>
/// <exception cref="InvalidOperationException">Connection was already requested</exception>
public void Connect(string host, int port)
{
if (WhenConnected != null)
throw new InvalidOperationException("Connection was already requested");
WhenConnected = ConnectAndRunAsync(host, port);
}
/// <summary>
@ -167,6 +196,41 @@ namespace Capnp.Rpc
GC.SuppressFinalize(this);
}
/// <summary>
/// Attaches a tracer to this connection. Only allowed in state 'Initializing'. To avoid race conditions,
/// this method should only be used in conjunction with the parameterless constructor (no auto-connect).
/// Call this method *before* calling Connect.
/// </summary>
/// <param name="tracer">Tracer to attach</param>
/// <exception cref="ArgumentNullException"><paramref name="tracer"/> is null</exception>
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
public void AttachTracer(IFrameTracer tracer)
{
if (tracer == null)
throw new ArgumentNullException(nameof(tracer));
if (State != ConnectionState.Initializing)
throw new InvalidOperationException("Connection is not in state 'Initializing'");
_attachTracerAction += () =>
{
_pump.AttachTracer(tracer);
};
}
/// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually.
/// </summary>
void IConnection.Close()
{
_client.Dispose();
}
/// <summary>
/// Returns the state of this connection.
/// </summary>
public ConnectionState State { get; private set; } = ConnectionState.Initializing;
/// <summary>
/// Gets the number of RPC protocol messages sent by this client so far.
/// </summary>
@ -183,6 +247,12 @@ namespace Capnp.Rpc
/// </summary>
public int? RemotePort => ((IPEndPoint)_client.Client?.RemoteEndPoint)?.Port;
/// <summary>
/// Gets the local port number which this client using,
/// or null if the connection is not yet established.
/// </summary>
public int? LocalPort => ((IPEndPoint)_client.Client?.LocalEndPoint)?.Port;
/// <summary>
/// Whether the I/O thread is currently running
/// </summary>

View File

@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Capnp.FrameTracing;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
@ -9,42 +10,21 @@ using System.Threading.Tasks;
namespace Capnp.Rpc
{
public class ConnectionEventArgs: EventArgs
{
public IConnection Connection { get; }
public ConnectionEventArgs(IConnection connection)
{
Connection = connection;
}
}
/// <summary>
/// Cap'n Proto RPC TCP server.
/// </summary>
public class TcpRpcServer: IDisposable
{
/// <summary>
/// Models an incoming connection.
/// </summary>
public interface IConnection
{
/// <summary>
/// Server-side port
/// </summary>
int LocalPort { get; }
/// <summary>
/// Receive message counter
/// </summary>
long RecvCount { get; }
/// <summary>
/// Sent message counter
/// </summary>
long SendCount { get; }
/// <summary>
/// Whether the RPC engine is currently computing.
/// </summary>
bool IsComputing { get; }
/// <summary>
/// Whether the connection is idle, waiting for data to receive.
/// </summary>
bool IsWaitingForData { get; }
}
ILogger Logger { get; } = Logging.CreateLogger<TcpRpcServer>();
class OutboundTcpEndpoint : IEndpoint
@ -71,18 +51,25 @@ namespace Capnp.Rpc
class Connection: IConnection
{
readonly TcpRpcServer _server;
public Connection(TcpRpcServer server, TcpClient client, FramePump pump, OutboundTcpEndpoint outboundEp, RpcEngine.RpcEndpoint inboundEp)
{
_server = server;
Client = client;
Pump = pump;
OutboundEp = outboundEp;
InboundEp = inboundEp;
}
public void Start()
{
PumpRunner = new Thread(o =>
{
try
{
Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}";
State = ConnectionState.Active;
Pump.Run();
}
@ -91,25 +78,46 @@ namespace Capnp.Rpc
OutboundEp.Dismiss();
InboundEp.Dismiss();
Pump.Dispose();
lock (server._reentrancyBlocker)
Client.Dispose();
lock (_server._reentrancyBlocker)
{
--server.ConnectionCount;
server._connections.Remove(this);
--_server.ConnectionCount;
_server._connections.Remove(this);
State = ConnectionState.Down;
_server.OnConnectionChanged?.Invoke(_server, new ConnectionEventArgs(this));
}
}
});
}
public ConnectionState State { get; set; } = ConnectionState.Initializing;
public TcpClient Client { get; private set; }
public FramePump Pump { get; private set; }
public OutboundTcpEndpoint OutboundEp { get; private set; }
public RpcEngine.RpcEndpoint InboundEp { get; private set; }
public Thread PumpRunner { get; private set; }
public int LocalPort => ((IPEndPoint)Client.Client.LocalEndPoint).Port;
public int? LocalPort => ((IPEndPoint)Client.Client.LocalEndPoint)?.Port;
public int? RemotePort => ((IPEndPoint)Client.Client.RemoteEndPoint)?.Port;
public long RecvCount => InboundEp.RecvCount;
public long SendCount => InboundEp.SendCount;
public bool IsComputing => PumpRunner.ThreadState == ThreadState.Running;
public bool IsWaitingForData => Pump.IsWaitingForData;
public void AttachTracer(IFrameTracer tracer)
{
if (tracer == null)
throw new ArgumentNullException(nameof(tracer));
if (State != ConnectionState.Initializing)
throw new InvalidOperationException("Connection is not in state 'Initializing'");
Pump.AttachTracer(tracer);
}
public void Close()
{
Client.Dispose();
}
}
readonly RpcEngine _rpcEngine;
@ -144,6 +152,9 @@ namespace Capnp.Rpc
{
++ConnectionCount;
_connections.Add(connection);
OnConnectionChanged?.Invoke(this, new ConnectionEventArgs(connection));
connection.Start();
}
connection.PumpRunner.Start();
@ -193,13 +204,7 @@ namespace Capnp.Rpc
/// </summary>
public void Dispose()
{
try
{
_listener.Stop();
}
catch (SocketException)
{
}
StopListening();
var connections = new List<Connection>();
@ -220,6 +225,20 @@ namespace Capnp.Rpc
GC.SuppressFinalize(this);
}
/// <summary>
/// Stops accepting incoming attempts.
/// </summary>
public void StopListening()
{
try
{
_listener.Stop();
}
catch (SocketException)
{
}
}
/// <summary>
/// Constructs an instance.
/// </summary>
@ -282,5 +301,10 @@ namespace Capnp.Rpc
}
}
}
/// <summary>
/// Fires when a new incoming connection was accepted, or when an active connection is closed.
/// </summary>
public event Action<object, ConnectionEventArgs> OnConnectionChanged;
}
}