2019-09-02 19:35:25 +02:00
using Capnp.FrameTracing ;
2020-04-15 21:52:56 +02:00
using Capnp.Util ;
2019-09-02 19:35:25 +02:00
using Microsoft.Extensions.Logging ;
2019-06-12 21:56:55 +02:00
using System ;
using System.Collections.Generic ;
2020-01-31 21:40:25 +01:00
using System.IO ;
2019-06-12 21:56:55 +02:00
using System.Net ;
using System.Net.Sockets ;
using System.Threading ;
namespace Capnp.Rpc
{
2019-12-11 21:09:43 +01:00
/// <summary>
/// Carries information on RPC connection state changes.
/// </summary>
2019-09-02 19:35:25 +02:00
public class ConnectionEventArgs : EventArgs
{
2019-12-11 21:09:43 +01:00
/// <summary>
/// Affected connection
/// </summary>
2019-09-02 19:35:25 +02:00
public IConnection Connection { get ; }
2019-12-11 21:09:43 +01:00
/// <summary>
/// Constructs an instance
/// </summary>
/// <param name="connection">RPC connection object</param>
2019-09-02 19:35:25 +02:00
public ConnectionEventArgs ( IConnection connection )
{
Connection = connection ;
}
}
2019-06-12 21:56:55 +02:00
/// <summary>
/// Cap'n Proto RPC TCP server.
/// </summary>
2020-02-22 23:47:56 +01:00
public class TcpRpcServer : ISupportsMidlayers , IDisposable
2019-06-12 21:56:55 +02:00
{
ILogger Logger { get ; } = Logging . CreateLogger < TcpRpcServer > ( ) ;
class OutboundTcpEndpoint : IEndpoint
{
readonly TcpRpcServer _server ;
readonly FramePump _pump ;
public OutboundTcpEndpoint ( TcpRpcServer server , FramePump pump )
{
_server = server ;
_pump = pump ;
}
public void Dismiss ( )
{
_pump . Dispose ( ) ;
}
public void Forward ( WireFrame frame )
{
_pump . Send ( frame ) ;
}
2020-04-22 22:19:16 +02:00
public void Flush ( )
{
_pump . Flush ( ) ;
}
2019-06-12 21:56:55 +02:00
}
class Connection : IConnection
{
2020-04-15 21:52:56 +02:00
ILogger Logger { get ; } = Logging . CreateLogger < Connection > ( ) ;
2020-02-16 18:00:31 +01:00
readonly List < IFrameTracer > _tracers = new List < IFrameTracer > ( ) ;
2019-09-02 19:35:25 +02:00
readonly TcpRpcServer _server ;
2020-01-31 21:40:25 +01:00
Stream _stream ;
2019-09-02 19:35:25 +02:00
2020-01-31 21:40:25 +01:00
public Connection ( TcpRpcServer server , TcpClient client )
2019-06-12 21:56:55 +02:00
{
2019-09-02 19:35:25 +02:00
_server = server ;
2019-06-12 21:56:55 +02:00
Client = client ;
2020-01-31 21:40:25 +01:00
_stream = client . GetStream ( ) ;
2019-09-02 19:35:25 +02:00
}
2019-06-12 21:56:55 +02:00
2019-09-02 19:35:25 +02:00
public void Start ( )
{
2020-01-31 21:40:25 +01:00
Pump = new FramePump ( _stream ) ;
2020-02-16 18:00:31 +01:00
foreach ( var tracer in _tracers )
{
Pump . AttachTracer ( tracer ) ;
}
_tracers . Clear ( ) ;
2020-01-31 21:40:25 +01:00
OutboundEp = new OutboundTcpEndpoint ( _server , Pump ) ;
InboundEp = _server . _rpcEngine . AddEndpoint ( OutboundEp ) ;
Pump . FrameReceived + = InboundEp . Forward ;
State = ConnectionState . Active ;
2019-06-12 21:56:55 +02:00
PumpRunner = new Thread ( o = >
{
try
{
Thread . CurrentThread . Name = $"TCP RPC Server Thread {Thread.CurrentThread.ManagedThreadId}" ;
Pump . Run ( ) ;
}
2020-04-15 21:52:56 +02:00
catch ( ThreadInterruptedException )
{
Logger . LogError ( $"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}" ) ;
}
2019-06-12 21:56:55 +02:00
finally
{
OutboundEp . Dismiss ( ) ;
InboundEp . Dismiss ( ) ;
Pump . Dispose ( ) ;
2019-09-02 19:35:25 +02:00
Client . Dispose ( ) ;
lock ( _server . _reentrancyBlocker )
2019-06-12 21:56:55 +02:00
{
2019-09-02 19:35:25 +02:00
- - _server . ConnectionCount ;
_server . _connections . Remove ( this ) ;
State = ConnectionState . Down ;
_server . OnConnectionChanged ? . Invoke ( _server , new ConnectionEventArgs ( this ) ) ;
2019-06-12 21:56:55 +02:00
}
}
} ) ;
2020-04-13 20:22:52 +02:00
PumpRunner . Start ( ) ;
2019-06-12 21:56:55 +02:00
}
2019-09-02 19:35:25 +02:00
public ConnectionState State { get ; set ; } = ConnectionState . Initializing ;
2019-06-12 21:56:55 +02:00
public TcpClient Client { get ; private set ; }
2020-02-16 18:00:31 +01:00
public FramePump ? Pump { get ; private set ; }
public OutboundTcpEndpoint ? OutboundEp { get ; private set ; }
public RpcEngine . RpcEndpoint ? InboundEp { get ; private set ; }
2020-01-11 17:21:31 +01:00
public Thread ? PumpRunner { get ; private set ; }
2019-09-02 19:35:25 +02:00
public int? LocalPort = > ( ( IPEndPoint ) Client . Client . LocalEndPoint ) ? . Port ;
public int? RemotePort = > ( ( IPEndPoint ) Client . Client . RemoteEndPoint ) ? . Port ;
2020-02-16 18:00:31 +01:00
public long RecvCount = > InboundEp ? . RecvCount ? ? 0 ;
public long SendCount = > InboundEp ? . SendCount ? ? 0 ;
2020-01-11 17:21:31 +01:00
public bool IsComputing = > PumpRunner ? . ThreadState = = ThreadState . Running ;
2020-02-16 18:00:31 +01:00
public bool IsWaitingForData = > Pump ? . IsWaitingForData ? ? false ;
2019-09-02 19:35:25 +02:00
public void AttachTracer ( IFrameTracer tracer )
{
if ( tracer = = null )
throw new ArgumentNullException ( nameof ( tracer ) ) ;
if ( State ! = ConnectionState . Initializing )
throw new InvalidOperationException ( "Connection is not in state 'Initializing'" ) ;
2020-02-16 18:00:31 +01:00
_tracers . Add ( tracer ) ;
2019-09-02 19:35:25 +02:00
}
2020-02-01 13:49:45 +01:00
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
2020-01-31 21:40:25 +01:00
public void InjectMidlayer ( Func < Stream , Stream > createFunc )
{
if ( createFunc = = null )
throw new ArgumentNullException ( nameof ( createFunc ) ) ;
if ( State ! = ConnectionState . Initializing )
throw new InvalidOperationException ( "Connection is not in state 'Initializing'" ) ;
_stream = createFunc ( _stream ) ;
}
2019-09-02 19:35:25 +02:00
public void Close ( )
{
Client . Dispose ( ) ;
}
2019-06-12 21:56:55 +02:00
}
readonly RpcEngine _rpcEngine ;
readonly object _reentrancyBlocker = new object ( ) ;
readonly List < Connection > _connections = new List < Connection > ( ) ;
2020-02-23 14:24:17 +01:00
Thread ? _acceptorThread ;
TcpListener ? _listener ;
2019-06-12 21:56:55 +02:00
/// <summary>
/// Gets the number of currently active inbound TCP connections.
/// </summary>
public int ConnectionCount { get ; private set ; }
2020-02-23 14:24:17 +01:00
void AcceptClients ( TcpListener listener )
2019-06-12 21:56:55 +02:00
{
try
{
if ( Thread . CurrentThread . Name = = null )
Thread . CurrentThread . Name = $"TCP RPC Acceptor Thread {Thread.CurrentThread.ManagedThreadId}" ;
while ( true )
{
2020-02-23 14:24:17 +01:00
var client = listener . AcceptTcpClient ( ) ;
2020-01-31 21:40:25 +01:00
var connection = new Connection ( this , client ) ;
2019-06-12 21:56:55 +02:00
lock ( _reentrancyBlocker )
{
+ + ConnectionCount ;
_connections . Add ( connection ) ;
2019-09-02 19:35:25 +02:00
OnConnectionChanged ? . Invoke ( this , new ConnectionEventArgs ( connection ) ) ;
connection . Start ( ) ;
2019-06-12 21:56:55 +02:00
}
}
}
catch ( SocketException )
{
// Listener was stopped. Maybe a little bit rude, but this is
// our way of shutting down the acceptor thread.
}
2020-04-15 21:52:56 +02:00
catch ( ThreadInterruptedException )
{
Logger . LogError ( $"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}" ) ;
}
2019-06-12 21:56:55 +02:00
catch ( System . Exception exception )
{
// Any other exception might be due to some other problem.
Logger . LogError ( exception . Message ) ;
}
}
/// <summary>
/// Stops accepting incoming attempts and closes all existing connections.
/// </summary>
public void Dispose ( )
{
2020-02-23 14:24:17 +01:00
if ( _listener ! = null )
{
StopListening ( ) ;
}
2019-06-12 21:56:55 +02:00
var connections = new List < Connection > ( ) ;
lock ( _reentrancyBlocker )
{
connections . AddRange ( _connections ) ;
}
foreach ( var connection in connections )
{
connection . Client . Dispose ( ) ;
2020-02-16 18:00:31 +01:00
connection . Pump ? . Dispose ( ) ;
2020-04-15 21:52:56 +02:00
connection . PumpRunner ? . SafeJoin ( Logger ) ;
2019-06-12 21:56:55 +02:00
}
2020-03-22 00:12:50 +01:00
_rpcEngine . BootstrapCap = null ;
2019-06-12 21:56:55 +02:00
GC . SuppressFinalize ( this ) ;
}
2019-09-02 19:35:25 +02:00
/// <summary>
/// Stops accepting incoming attempts.
/// </summary>
public void StopListening ( )
{
2020-02-23 14:24:17 +01:00
if ( _listener = = null )
throw new InvalidOperationException ( "Listening was never started" ) ;
2019-09-02 19:35:25 +02:00
try
{
_listener . Stop ( ) ;
}
catch ( SocketException )
{
}
2020-02-23 14:24:17 +01:00
finally
{
_listener = null ;
2020-04-13 20:22:52 +02:00
if ( Thread . CurrentThread ! = _acceptorThread )
_acceptorThread ? . Join ( ) ;
2020-02-23 14:24:17 +01:00
_acceptorThread = null ;
}
2019-09-02 19:35:25 +02:00
}
2020-02-22 23:47:56 +01:00
/// <summary>
/// Installs a midlayer.
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
/// </summary>
/// <param name="createFunc"></param>
public void InjectMidlayer ( Func < Stream , Stream > createFunc )
{
OnConnectionChanged + = ( _ , e ) = >
{
if ( e . Connection . State = = ConnectionState . Initializing )
{
e . Connection . InjectMidlayer ( createFunc ) ;
}
} ;
2019-09-02 19:35:25 +02:00
}
2019-06-12 21:56:55 +02:00
/// <summary>
/// Constructs an instance.
/// </summary>
2020-02-23 14:24:17 +01:00
public TcpRpcServer ( )
{
_rpcEngine = new RpcEngine ( ) ;
}
/// <summary>
/// Constructs an instance, starts listening to the specified TCP/IP endpoint and accepting clients.
/// If you intend configuring a midlayer or consuming the <see cref="OnConnectionChanged"/> event,
/// you should not use this constructor, since it may lead to an early-client race condition.
/// Instead, use the parameterless constructor, configure, then call <see cref="StartAccepting(IPAddress, int)"/>.
/// </summary>
2019-06-12 21:56:55 +02:00
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
/// <param name="port">The port on which to listen for incoming connection attempts.</param>
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
2020-02-23 14:24:17 +01:00
/// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
public TcpRpcServer ( IPAddress localAddr , int port ) : this ( )
2019-06-12 21:56:55 +02:00
{
2020-02-23 14:24:17 +01:00
StartAccepting ( localAddr , port ) ;
}
/// <summary>
/// Starts listening to the specified TCP/IP endpoint and accepting clients.
/// </summary>
/// <param name="localAddr">An System.Net.IPAddress that represents the local IP address.</param>
/// <param name="port">The port on which to listen for incoming connection attempts.</param>
/// <exception cref="ArgumentNullException"><paramref name="localAddr"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="InvalidOperationException">Listening activity was already started</exception>
/// <exception cref="SocketException">The underlying <see cref="TcpListener"/> detected an error condition, such as the desired endpoint is already occupied.</exception>
public void StartAccepting ( IPAddress localAddr , int port )
{
if ( _listener ! = null )
throw new InvalidOperationException ( "Listening activity was already started" ) ;
2019-07-07 19:59:31 +02:00
2020-02-23 14:24:17 +01:00
var listener = new TcpListener ( localAddr , port )
{
ExclusiveAddressUse = false
} ;
2019-07-07 19:59:31 +02:00
2020-02-22 11:22:13 +01:00
int attempt = 0 ;
while ( true )
2019-07-07 19:59:31 +02:00
{
try
{
2020-02-23 14:24:17 +01:00
listener . Start ( ) ;
2019-07-07 19:59:31 +02:00
break ;
}
catch ( SocketException socketException )
{
2020-02-22 11:22:13 +01:00
if ( attempt = = 5 )
throw ;
2020-02-23 14:24:17 +01:00
2020-02-22 11:22:13 +01:00
Logger . LogWarning ( $"Failed to listen on port {port}, attempt {attempt}: {socketException}" ) ;
2019-07-07 19:59:31 +02:00
}
2019-06-12 21:56:55 +02:00
2020-02-22 11:22:13 +01:00
+ + attempt ;
2020-02-23 14:24:17 +01:00
Thread . Sleep ( 10 ) ;
2019-07-07 19:59:31 +02:00
}
2019-06-12 21:56:55 +02:00
2020-02-23 14:24:17 +01:00
_acceptorThread = new Thread ( ( ) = > AcceptClients ( listener ) ) ;
_listener = listener ;
2019-06-12 21:56:55 +02:00
_acceptorThread . Start ( ) ;
}
/// <summary>
/// Whether the thread which is responsible for acception incoming attempts is still alive.
2020-02-23 14:24:17 +01:00
/// The thread will die after calling <see cref="StopListening"/>, upon disposal, but also in case of a socket error condition.
2019-06-12 21:56:55 +02:00
/// Errors which occur on a particular connection will just close that connection and won't interfere
/// with the acceptor thread.
/// </summary>
2020-02-23 14:24:17 +01:00
public bool IsAlive = > _acceptorThread ? . IsAlive ? ? false ;
2019-06-12 21:56:55 +02:00
/// <summary>
/// Sets the bootstrap capability. It must be an object which implements a valid capability interface
/// (<see cref="SkeletonAttribute"/>).
/// </summary>
public object Main
{
2020-03-07 19:37:30 +01:00
set { _rpcEngine . Main = value ; }
2019-06-12 21:56:55 +02:00
}
/// <summary>
/// Gets a snapshot of currently active connections.
/// </summary>
public IConnection [ ] Connections
{
get
{
lock ( _reentrancyBlocker )
{
return _connections . ToArray ( ) ;
}
}
}
2019-09-02 19:35:25 +02:00
/// <summary>
/// Fires when a new incoming connection was accepted, or when an active connection is closed.
/// </summary>
2020-01-11 17:21:31 +01:00
public event Action < object , ConnectionEventArgs > ? OnConnectionChanged ;
2019-06-12 21:56:55 +02:00
}
2020-01-11 17:56:12 +01:00
}