using Capnp.FrameTracing; using Capnp.Util; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; namespace Capnp.Rpc { /// /// Carries information on RPC connection state changes. /// public class ConnectionEventArgs: EventArgs { /// /// Affected connection /// public IConnection Connection { get; } /// /// Constructs an instance /// /// RPC connection object public ConnectionEventArgs(IConnection connection) { Connection = connection; } } /// /// Cap'n Proto RPC TCP server. /// public class TcpRpcServer: ISupportsMidlayers, IDisposable { ILogger Logger { get; } = Logging.CreateLogger(); class OutboundTcpEndpoint : IEndpoint { readonly TcpRpcServer _server; readonly FramePump _pump; public OutboundTcpEndpoint(TcpRpcServer server, FramePump pump) { _server = server; _pump = pump; } public void Dismiss() { _pump.Dispose(); } public void Forward(WireFrame frame) { _pump.Send(frame); } public void Flush() { _pump.Flush(); } } class Connection: IConnection { ILogger Logger { get; } = Logging.CreateLogger(); readonly List _tracers = new List(); readonly TcpRpcServer _server; Stream _stream; public Connection(TcpRpcServer server, TcpClient client) { _server = server; Client = client; _stream = client.GetStream(); } public void Start() { Pump = new FramePump(_stream); foreach (var tracer in _tracers) { Pump.AttachTracer(tracer); } _tracers.Clear(); OutboundEp = new OutboundTcpEndpoint(_server, Pump); InboundEp = _server._rpcEngine.AddEndpoint(OutboundEp); Pump.FrameReceived += InboundEp.Forward; State = ConnectionState.Active; PumpRunner = new Thread(o => { try { Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}"; Pump.Run(); } catch (ThreadInterruptedException) { Logger.LogError($"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}"); } finally { OutboundEp.Dismiss(); InboundEp.Dismiss(); Pump.Dispose(); Client.Dispose(); lock (_server._reentrancyBlocker) { --_server.ConnectionCount; _server._connections.Remove(this); State = ConnectionState.Down; _server.OnConnectionChanged?.Invoke(_server, new ConnectionEventArgs(this)); } } }); PumpRunner.Start(); } public ConnectionState State { get; set; } = ConnectionState.Initializing; public TcpClient Client { get; private set; } public FramePump? Pump { get; private set; } public OutboundTcpEndpoint? OutboundEp { get; private set; } public RpcEngine.RpcEndpoint? InboundEp { get; private set; } public Thread? PumpRunner { get; private set; } public int? LocalPort => ((IPEndPoint)Client.Client.LocalEndPoint)?.Port; public int? RemotePort => ((IPEndPoint)Client.Client.RemoteEndPoint)?.Port; public long RecvCount => InboundEp?.RecvCount ?? 0; public long SendCount => InboundEp?.SendCount ?? 0; public bool IsComputing => PumpRunner?.ThreadState == ThreadState.Running; public bool IsWaitingForData => Pump?.IsWaitingForData ?? false; public void AttachTracer(IFrameTracer tracer) { if (tracer == null) throw new ArgumentNullException(nameof(tracer)); if (State != ConnectionState.Initializing) throw new InvalidOperationException("Connection is not in state 'Initializing'"); _tracers.Add(tracer); } /// /// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received /// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms. /// /// Callback for wrapping the midlayer around its underlying stream /// is null public void InjectMidlayer(Func createFunc) { if (createFunc == null) throw new ArgumentNullException(nameof(createFunc)); if (State != ConnectionState.Initializing) throw new InvalidOperationException("Connection is not in state 'Initializing'"); _stream = createFunc(_stream); } public void Close() { Client.Dispose(); } } readonly RpcEngine _rpcEngine; readonly object _reentrancyBlocker = new object(); readonly List _connections = new List(); Thread? _acceptorThread; TcpListener? _listener; /// /// Gets the number of currently active inbound TCP connections. /// public int ConnectionCount { get; private set; } void AcceptClients(TcpListener listener) { try { if (Thread.CurrentThread.Name == null) Thread.CurrentThread.Name = $"TCP RPC Acceptor Thread {Thread.CurrentThread.ManagedThreadId}"; while (true) { var client = listener.AcceptTcpClient(); var connection = new Connection(this, client); lock (_reentrancyBlocker) { ++ConnectionCount; _connections.Add(connection); OnConnectionChanged?.Invoke(this, new ConnectionEventArgs(connection)); connection.Start(); } } } catch (SocketException) { // Listener was stopped. Maybe a little bit rude, but this is // our way of shutting down the acceptor thread. } catch (ThreadInterruptedException) { Logger.LogError($"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}"); } catch (System.Exception exception) { // Any other exception might be due to some other problem. Logger.LogError(exception.Message); } } /// /// Stops accepting incoming attempts and closes all existing connections. /// public void Dispose() { if (_listener != null) { StopListening(); } var connections = new List(); lock (_reentrancyBlocker) { connections.AddRange(_connections); } foreach (var connection in connections) { connection.Client.Dispose(); connection.Pump?.Dispose(); connection.PumpRunner?.SafeJoin(Logger); } _rpcEngine.BootstrapCap = null; GC.SuppressFinalize(this); } /// /// Stops accepting incoming attempts. /// public void StopListening() { if (_listener == null) throw new InvalidOperationException("Listening was never started"); try { _listener.Stop(); } catch (SocketException) { } finally { _listener = null; if (Thread.CurrentThread != _acceptorThread) _acceptorThread?.Join(); _acceptorThread = null; } } /// /// Installs a midlayer. /// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream. /// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received /// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more. /// /// public void InjectMidlayer(Func createFunc) { OnConnectionChanged += (_, e) => { if (e.Connection.State == ConnectionState.Initializing) { e.Connection.InjectMidlayer(createFunc); } }; } /// /// Constructs an instance. /// public TcpRpcServer() { _rpcEngine = new RpcEngine(); } /// /// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients. /// If you intend configuring a midlayer or consuming the event, /// you should not use this constructor, since it may lead to an early-client race condition. /// Instead, use the parameterless constructor, configure, then call . /// /// An System.Net.IPAddress that represents the local IP address. /// The port on which to listen for incoming connection attempts. /// is null. /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. /// The underlying detected an error condition, such as the desired endpoint is already occupied. public TcpRpcServer(IPAddress localAddr, int port): this() { StartAccepting(localAddr, port); } /// /// Starts listening to the specified TCP/IP endpoint and accepting clients. /// /// An System.Net.IPAddress that represents the local IP address. /// The port on which to listen for incoming connection attempts. /// is null. /// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort. /// Listening activity was already started /// The underlying detected an error condition, such as the desired endpoint is already occupied. public void StartAccepting(IPAddress localAddr, int port) { if (_listener != null) throw new InvalidOperationException("Listening activity was already started"); var listener = new TcpListener(localAddr, port) { ExclusiveAddressUse = false }; int attempt = 0; while (true) { try { listener.Start(); break; } catch (SocketException socketException) { if (attempt == 5) throw; Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}"); } ++attempt; Thread.Sleep(10); } _acceptorThread = new Thread(() => AcceptClients(listener)); _listener = listener; _acceptorThread.Start(); } /// /// Whether the thread which is responsible for acception incoming attempts is still alive. /// The thread will die after calling , upon disposal, but also in case of a socket error condition. /// Errors which occur on a particular connection will just close that connection and won't interfere /// with the acceptor thread. /// public bool IsAlive => _acceptorThread?.IsAlive ?? false; /// /// Sets the bootstrap capability. It must be an object which implements a valid capability interface /// (). /// public object Main { set { _rpcEngine.Main = value; } } /// /// Gets a snapshot of currently active connections. /// public IConnection[] Connections { get { lock (_reentrancyBlocker) { return _connections.ToArray(); } } } /// /// Fires when a new incoming connection was accepted, or when an active connection is closed. /// public event Action? OnConnectionChanged; } }