using Capnp.FrameTracing;
using Capnp.Util;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace Capnp.Rpc
{
///
/// Carries information on RPC connection state changes.
///
public class ConnectionEventArgs: EventArgs
{
///
/// Affected connection
///
public IConnection Connection { get; }
///
/// Constructs an instance
///
/// RPC connection object
public ConnectionEventArgs(IConnection connection)
{
Connection = connection;
}
}
///
/// Cap'n Proto RPC TCP server.
///
public class TcpRpcServer: ISupportsMidlayers, IDisposable
{
ILogger Logger { get; } = Logging.CreateLogger();
class OutboundTcpEndpoint : IEndpoint
{
readonly TcpRpcServer _server;
readonly FramePump _pump;
public OutboundTcpEndpoint(TcpRpcServer server, FramePump pump)
{
_server = server;
_pump = pump;
}
public void Dismiss()
{
_pump.Dispose();
}
public void Forward(WireFrame frame)
{
_pump.Send(frame);
}
public void Flush()
{
_pump.Flush();
}
}
class Connection: IConnection
{
ILogger Logger { get; } = Logging.CreateLogger();
readonly List _tracers = new List();
readonly TcpRpcServer _server;
Stream _stream;
public Connection(TcpRpcServer server, TcpClient client)
{
_server = server;
Client = client;
_stream = client.GetStream();
}
public void Start()
{
Pump = new FramePump(_stream);
foreach (var tracer in _tracers)
{
Pump.AttachTracer(tracer);
}
_tracers.Clear();
OutboundEp = new OutboundTcpEndpoint(_server, Pump);
InboundEp = _server._rpcEngine.AddEndpoint(OutboundEp);
Pump.FrameReceived += InboundEp.Forward;
State = ConnectionState.Active;
PumpRunner = new Thread(o =>
{
try
{
Thread.CurrentThread.Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}";
Pump.Run();
}
catch (ThreadInterruptedException)
{
Logger.LogError($"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}");
}
finally
{
OutboundEp.Dismiss();
InboundEp.Dismiss();
Pump.Dispose();
Client.Dispose();
lock (_server._reentrancyBlocker)
{
--_server.ConnectionCount;
_server._connections.Remove(this);
State = ConnectionState.Down;
_server.OnConnectionChanged?.Invoke(_server, new ConnectionEventArgs(this));
}
}
});
PumpRunner.Start();
}
public ConnectionState State { get; set; } = ConnectionState.Initializing;
public TcpClient Client { get; private set; }
public FramePump? Pump { get; private set; }
public OutboundTcpEndpoint? OutboundEp { get; private set; }
public RpcEngine.RpcEndpoint? InboundEp { get; private set; }
public Thread? PumpRunner { get; private set; }
public int? LocalPort => ((IPEndPoint)Client.Client.LocalEndPoint)?.Port;
public int? RemotePort => ((IPEndPoint)Client.Client.RemoteEndPoint)?.Port;
public long RecvCount => InboundEp?.RecvCount ?? 0;
public long SendCount => InboundEp?.SendCount ?? 0;
public bool IsComputing => PumpRunner?.ThreadState == ThreadState.Running;
public bool IsWaitingForData => Pump?.IsWaitingForData ?? false;
public void AttachTracer(IFrameTracer tracer)
{
if (tracer == null)
throw new ArgumentNullException(nameof(tracer));
if (State != ConnectionState.Initializing)
throw new InvalidOperationException("Connection is not in state 'Initializing'");
_tracers.Add(tracer);
}
///
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
///
/// Callback for wrapping the midlayer around its underlying stream
/// is null
public void InjectMidlayer(Func createFunc)
{
if (createFunc == null)
throw new ArgumentNullException(nameof(createFunc));
if (State != ConnectionState.Initializing)
throw new InvalidOperationException("Connection is not in state 'Initializing'");
_stream = createFunc(_stream);
}
public void Close()
{
Client.Dispose();
}
}
readonly RpcEngine _rpcEngine;
readonly object _reentrancyBlocker = new object();
readonly List _connections = new List();
Thread? _acceptorThread;
TcpListener? _listener;
///
/// Gets the number of currently active inbound TCP connections.
///
public int ConnectionCount { get; private set; }
void AcceptClients(TcpListener listener)
{
try
{
if (Thread.CurrentThread.Name == null)
Thread.CurrentThread.Name = $"TCP RPC Acceptor Thread {Thread.CurrentThread.ManagedThreadId}";
while (true)
{
var client = listener.AcceptTcpClient();
var connection = new Connection(this, client);
lock (_reentrancyBlocker)
{
++ConnectionCount;
_connections.Add(connection);
OnConnectionChanged?.Invoke(this, new ConnectionEventArgs(connection));
connection.Start();
}
}
}
catch (SocketException)
{
// Listener was stopped. Maybe a little bit rude, but this is
// our way of shutting down the acceptor thread.
}
catch (ThreadInterruptedException)
{
Logger.LogError($"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}");
}
catch (System.Exception exception)
{
// Any other exception might be due to some other problem.
Logger.LogError(exception.Message);
}
}
///
/// Stops accepting incoming attempts and closes all existing connections.
///
public void Dispose()
{
if (_listener != null)
{
StopListening();
}
var connections = new List();
lock (_reentrancyBlocker)
{
connections.AddRange(_connections);
}
foreach (var connection in connections)
{
connection.Client.Dispose();
connection.Pump?.Dispose();
connection.PumpRunner?.SafeJoin(Logger);
}
_rpcEngine.BootstrapCap = null;
GC.SuppressFinalize(this);
}
///
/// Stops accepting incoming attempts.
///
public void StopListening()
{
if (_listener == null)
throw new InvalidOperationException("Listening was never started");
try
{
_listener.Stop();
}
catch (SocketException)
{
}
finally
{
_listener = null;
if (Thread.CurrentThread != _acceptorThread)
_acceptorThread?.Join();
_acceptorThread = null;
}
}
///
/// Installs a midlayer.
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
///
///
public void InjectMidlayer(Func createFunc)
{
OnConnectionChanged += (_, e) =>
{
if (e.Connection.State == ConnectionState.Initializing)
{
e.Connection.InjectMidlayer(createFunc);
}
};
}
///
/// Constructs an instance.
///
public TcpRpcServer()
{
_rpcEngine = new RpcEngine();
}
///
/// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients.
/// If you intend configuring a midlayer or consuming the event,
/// you should not use this constructor, since it may lead to an early-client race condition.
/// Instead, use the parameterless constructor, configure, then call .
///
/// An System.Net.IPAddress that represents the local IP address.
/// The port on which to listen for incoming connection attempts.
/// is null.
/// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.
/// The underlying detected an error condition, such as the desired endpoint is already occupied.
public TcpRpcServer(IPAddress localAddr, int port): this()
{
StartAccepting(localAddr, port);
}
///
/// Starts listening to the specified TCP/IP endpoint and accepting clients.
///
/// An System.Net.IPAddress that represents the local IP address.
/// The port on which to listen for incoming connection attempts.
/// is null.
/// is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.
/// Listening activity was already started
/// The underlying detected an error condition, such as the desired endpoint is already occupied.
public void StartAccepting(IPAddress localAddr, int port)
{
if (_listener != null)
throw new InvalidOperationException("Listening activity was already started");
var listener = new TcpListener(localAddr, port)
{
ExclusiveAddressUse = false
};
int attempt = 0;
while (true)
{
try
{
listener.Start();
break;
}
catch (SocketException socketException)
{
if (attempt == 5)
throw;
Logger.LogWarning($"Failed to listen on port {port}, attempt {attempt}: {socketException}");
}
++attempt;
Thread.Sleep(10);
}
_acceptorThread = new Thread(() => AcceptClients(listener));
_listener = listener;
_acceptorThread.Start();
}
///
/// Whether the thread which is responsible for acception incoming attempts is still alive.
/// The thread will die after calling , upon disposal, but also in case of a socket error condition.
/// Errors which occur on a particular connection will just close that connection and won't interfere
/// with the acceptor thread.
///
public bool IsAlive => _acceptorThread?.IsAlive ?? false;
///
/// Sets the bootstrap capability. It must be an object which implements a valid capability interface
/// ().
///
public object Main
{
set { _rpcEngine.Main = value; }
}
///
/// Gets a snapshot of currently active connections.
///
public IConnection[] Connections
{
get
{
lock (_reentrancyBlocker)
{
return _connections.ToArray();
}
}
}
///
/// Fires when a new incoming connection was accepted, or when an active connection is closed.
///
public event Action