2022-05-16 22:55:54 +02:00
using Capnp.FrameTracing ;
2020-04-15 21:52:56 +02:00
using Capnp.Util ;
2019-09-02 19:35:25 +02:00
using Microsoft.Extensions.Logging ;
2019-06-12 21:56:55 +02:00
using System ;
2020-02-09 00:55:48 +01:00
using System.IO ;
2019-06-12 21:56:55 +02:00
using System.Net ;
using System.Net.Sockets ;
using System.Threading ;
using System.Threading.Tasks ;
namespace Capnp.Rpc
{
2022-06-03 00:51:33 +02:00
/// <summary>
/// ConnectionStateChange of TcpClient
/// </summary>
2022-05-16 22:55:54 +02:00
public class ConnectionStateChange
2022-02-17 00:44:35 +01:00
{
2022-06-03 00:51:33 +02:00
/// <summary>
/// New State of TcpClient
/// </summary>
2022-02-17 00:44:35 +01:00
public ConnectionState NewState { get ; set ; }
2022-06-03 00:51:33 +02:00
/// <summary>
/// Last State of TcpClient
/// </summary>
2022-02-17 00:44:35 +01:00
public ConnectionState LastState { get ; set ; }
}
2019-06-12 21:56:55 +02:00
/// <summary>
/// TCP-based RPC implementation which will establish a connection to a TCP server implementing
/// the Cap'n Proto RPC protocol.
/// </summary>
2019-09-02 19:35:25 +02:00
public class TcpRpcClient : IConnection , IDisposable
2019-06-12 21:56:55 +02:00
{
ILogger Logger { get ; } = Logging . CreateLogger < TcpRpcClient > ( ) ;
class OutboundTcpEndpoint : IEndpoint
{
readonly FramePump _pump ;
2020-03-01 13:18:55 +01:00
public OutboundTcpEndpoint ( FramePump pump )
2019-06-12 21:56:55 +02:00
{
_pump = pump ;
}
public void Dismiss ( )
{
_pump . Dispose ( ) ;
}
public void Forward ( WireFrame frame )
{
_pump . Send ( frame ) ;
}
2020-04-22 22:19:16 +02:00
public void Flush ( )
{
_pump . Flush ( ) ;
}
2019-06-12 21:56:55 +02:00
}
readonly RpcEngine _rpcEngine ;
readonly TcpClient _client ;
2020-01-31 21:40:25 +01:00
Func < Stream , Stream > _createLayers = _ = > _ ;
2020-01-11 17:21:31 +01:00
RpcEngine . RpcEndpoint ? _inboundEndpoint ;
OutboundTcpEndpoint ? _outboundEndpoint ;
FramePump ? _pump ;
Thread ? _pumpThread ;
Action ? _attachTracerAction ;
2019-06-12 21:56:55 +02:00
/// <summary>
2019-09-02 19:35:25 +02:00
/// Gets a Task which completes when TCP is connected. Will be
/// null until connection is actually requested (either by calling Connect or using appropriate constructor).
2019-06-12 21:56:55 +02:00
/// </summary>
2020-01-11 17:21:31 +01:00
public Task ? WhenConnected { get ; private set ; }
2019-06-12 21:56:55 +02:00
async Task ConnectAsync ( string host , int port )
{
2019-07-07 19:59:31 +02:00
for ( int retry = 0 ; ; retry + + )
2019-06-12 21:56:55 +02:00
{
2019-07-07 19:59:31 +02:00
try
{
await _client . ConnectAsync ( host , port ) ;
return ;
}
catch ( SocketException exception ) when ( retry < 240 & & exception . SocketErrorCode = = SocketError . AddressAlreadyInUse )
{
await Task . Delay ( 1000 ) ;
}
catch ( SocketException exception )
{
throw new RpcException ( "TcpRpcClient is unable to connect" , exception ) ;
}
2019-06-12 21:56:55 +02:00
}
}
2019-07-04 22:05:37 +02:00
2019-09-02 19:35:25 +02:00
async Task ConnectAndRunAsync ( string host , int port )
2019-06-12 21:56:55 +02:00
{
await ConnectAsync ( host , port ) ;
2019-09-02 19:35:25 +02:00
State = ConnectionState . Active ;
2020-01-31 21:40:25 +01:00
var stream = _createLayers ( _client . GetStream ( ) ) ;
_pump = new FramePump ( stream ) ;
2019-09-02 19:35:25 +02:00
_attachTracerAction ? . Invoke ( ) ;
2020-03-01 13:18:55 +01:00
_outboundEndpoint = new OutboundTcpEndpoint ( _pump ) ;
2019-06-12 21:56:55 +02:00
_inboundEndpoint = _rpcEngine . AddEndpoint ( _outboundEndpoint ) ;
_pumpThread = new Thread ( ( ) = >
{
try
{
Thread . CurrentThread . Name = $"TCP RPC Client Thread {Thread.CurrentThread.ManagedThreadId}" ;
_pump . Run ( ) ;
}
2020-04-15 21:52:56 +02:00
catch ( ThreadInterruptedException )
{
Logger . LogError ( $"{Thread.CurrentThread.Name} interrupted at {Environment.StackTrace}" ) ;
}
2019-06-12 21:56:55 +02:00
finally
{
2019-09-02 19:35:25 +02:00
State = ConnectionState . Down ;
2019-06-12 21:56:55 +02:00
_outboundEndpoint . Dismiss ( ) ;
_inboundEndpoint . Dismiss ( ) ;
_pump . Dispose ( ) ;
}
} ) ;
_pump . FrameReceived + = _inboundEndpoint . Forward ;
_pumpThread . Start ( ) ;
}
/// <summary>
/// Constructs an instance and attempts to connect it to given host.
/// </summary>
/// <param name="host">The DNS name of the remote RPC host</param>
/// <param name="port">The port number of the remote RPC host</param>
/// <exception cref="ArgumentNullException"><paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="System.Net.Sockets.SocketException">An error occurred when accessing the socket.</exception>
2019-09-02 19:35:25 +02:00
public TcpRpcClient ( string host , int port ) : this ( )
{
Connect ( host , port ) ;
}
/// <summary>
/// Constructs an instance but does not yet attempt to connect.
/// </summary>
2022-02-17 00:44:35 +01:00
/// <param name="receiveTimeout">The time-out value of the connection in milliseconds.</param>
/// <param name="sendTimeout">The send time-out value, in milliseconds.</param>
public TcpRpcClient ( int receiveTimeout = 0 , int sendTimeout = 0 )
2019-06-12 21:56:55 +02:00
{
_rpcEngine = new RpcEngine ( ) ;
2022-02-17 00:44:35 +01:00
_client = new TcpClient
{
ExclusiveAddressUse = false ,
ReceiveTimeout = receiveTimeout ,
SendTimeout = sendTimeout
} ;
2019-09-02 19:35:25 +02:00
}
/// <summary>
/// Attempts to connect it to given host.
/// </summary>
/// <param name="host">The DNS name of the remote RPC host</param>
/// <param name="port">The port number of the remote RPC host</param>
/// <exception cref="ArgumentNullException"><paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="System.Net.Sockets.SocketException">An error occurred when accessing the socket.</exception>
/// <exception cref="InvalidOperationException">Connection was already requested</exception>
public void Connect ( string host , int port )
{
if ( WhenConnected ! = null )
throw new InvalidOperationException ( "Connection was already requested" ) ;
2019-06-12 21:56:55 +02:00
2019-09-02 19:35:25 +02:00
WhenConnected = ConnectAndRunAsync ( host , port ) ;
2019-06-12 21:56:55 +02:00
}
/// <summary>
/// Returns the remote bootstrap capability.
/// </summary>
/// <typeparam name="TProxy">Bootstrap capability interface</typeparam>
/// <returns>A proxy for the bootstrap capability</returns>
2020-01-11 17:21:31 +01:00
/// <exception cref="InvalidOperationException">Not connected</exception>
2020-04-11 15:48:02 +02:00
public TProxy GetMain < TProxy > ( ) where TProxy : class , IDisposable
2019-06-12 21:56:55 +02:00
{
2020-01-11 17:21:31 +01:00
if ( WhenConnected = = null )
{
throw new InvalidOperationException ( "Not connecting" ) ;
}
2020-04-11 15:48:02 +02:00
async Task < TProxy > GetMainAsync ( )
2019-06-12 21:56:55 +02:00
{
2020-04-11 15:48:02 +02:00
await WhenConnected ! ;
return ( CapabilityReflection . CreateProxy < TProxy > ( _inboundEndpoint ! . QueryMain ( ) ) as TProxy ) ! ;
2019-06-12 21:56:55 +02:00
}
2020-04-11 15:48:02 +02:00
return GetMainAsync ( ) . Eager ( true ) ;
2019-06-12 21:56:55 +02:00
}
/// <summary>
/// Dispose pattern implementation
/// </summary>
public void Dispose ( )
{
_client . Dispose ( ) ;
try
{
2020-01-11 17:21:31 +01:00
if ( WhenConnected ! = null & & ! WhenConnected . Wait ( 500 ) )
2019-06-12 21:56:55 +02:00
{
Logger . LogError ( "Unable to join connection task within timeout" ) ;
}
}
2019-06-22 18:43:30 -04:00
catch ( System . Exception e )
2019-06-12 21:56:55 +02:00
{
2019-06-22 18:43:30 -04:00
Logger . LogError ( e , "Failure disposing client" ) ;
2019-06-12 21:56:55 +02:00
}
2020-04-15 21:52:56 +02:00
_pumpThread ? . SafeJoin ( Logger ) ;
2019-06-12 21:56:55 +02:00
GC . SuppressFinalize ( this ) ;
}
2019-09-02 19:35:25 +02:00
/// <summary>
/// Attaches a tracer to this connection. Only allowed in state 'Initializing'. To avoid race conditions,
/// this method should only be used in conjunction with the parameterless constructor (no auto-connect).
/// Call this method *before* calling Connect.
/// </summary>
/// <param name="tracer">Tracer to attach</param>
/// <exception cref="ArgumentNullException"><paramref name="tracer"/> is null</exception>
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
public void AttachTracer ( IFrameTracer tracer )
{
if ( tracer = = null )
throw new ArgumentNullException ( nameof ( tracer ) ) ;
if ( State ! = ConnectionState . Initializing )
throw new InvalidOperationException ( "Connection is not in state 'Initializing'" ) ;
_attachTracerAction + = ( ) = >
{
2020-01-11 17:21:31 +01:00
_pump ? . AttachTracer ( tracer ) ;
2019-09-02 19:35:25 +02:00
} ;
}
2020-02-01 13:49:45 +01:00
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
2020-01-31 21:40:25 +01:00
public void InjectMidlayer ( Func < Stream , Stream > createFunc )
{
if ( createFunc = = null )
throw new ArgumentNullException ( nameof ( createFunc ) ) ;
if ( State ! = ConnectionState . Initializing )
throw new InvalidOperationException ( "Connection is not in state 'Initializing'" ) ;
var last = _createLayers ;
_createLayers = _ = > createFunc ( last ( _ ) ) ;
}
2019-09-02 19:35:25 +02:00
/// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually.
/// </summary>
void IConnection . Close ( )
{
_client . Dispose ( ) ;
}
2022-02-17 00:44:35 +01:00
/// <summary>
/// State of Connection has changed
/// </summary>
2022-05-16 22:55:54 +02:00
public event EventHandler < ConnectionStateChange > ConnectionStateChanged ;
2022-02-17 00:44:35 +01:00
private ConnectionState _State = ConnectionState . Initializing ;
2019-09-02 19:35:25 +02:00
/// <summary>
/// Returns the state of this connection.
/// </summary>
2022-02-17 00:44:35 +01:00
public ConnectionState State {
get
{
return _State ;
}
private set
{
2022-05-16 22:55:54 +02:00
ConnectionStateChange args = new ConnectionStateChange ( )
2022-02-17 00:44:35 +01:00
{
LastState = _State ,
NewState = value
} ;
_State = value ;
2022-05-16 22:55:54 +02:00
EventHandler < ConnectionStateChange > eventHandler = ConnectionStateChanged ;
if ( eventHandler ! = null )
{
eventHandler ( this , args ) ;
}
2022-02-17 00:44:35 +01:00
}
}
2019-09-02 19:35:25 +02:00
2019-06-12 21:56:55 +02:00
/// <summary>
/// Gets the number of RPC protocol messages sent by this client so far.
/// </summary>
2020-01-11 17:21:31 +01:00
public long SendCount = > _inboundEndpoint ? . SendCount ? ? 0 ;
2019-06-12 21:56:55 +02:00
/// <summary>
/// Gets the number of RPC protocol messages received by this client so far.
/// </summary>
2020-01-11 17:21:31 +01:00
public long RecvCount = > _inboundEndpoint ? . RecvCount ? ? 0 ;
2019-06-12 21:56:55 +02:00
/// <summary>
/// Gets the remote port number which this client is connected to,
/// or null if the connection is not yet established.
/// </summary>
2020-01-11 17:21:31 +01:00
public int? RemotePort = > ( ( IPEndPoint ) _client . Client . RemoteEndPoint ) ? . Port ;
2019-06-12 21:56:55 +02:00
2019-09-02 19:35:25 +02:00
/// <summary>
/// Gets the local port number which this client using,
/// or null if the connection is not yet established.
/// </summary>
2020-01-11 17:21:31 +01:00
public int? LocalPort = > ( ( IPEndPoint ) _client . Client . LocalEndPoint ) ? . Port ;
2019-09-02 19:35:25 +02:00
2019-06-12 21:56:55 +02:00
/// <summary>
/// Whether the I/O thread is currently running
/// </summary>
2020-01-11 17:21:31 +01:00
public bool IsComputing = > _pumpThread ? . ThreadState = = System . Threading . ThreadState . Running ;
2019-06-12 21:56:55 +02:00
/// <summary>
/// Whether the I/O thread is waiting for data to receive
/// </summary>
2020-01-11 17:21:31 +01:00
public bool IsWaitingForData = > _pump ? . IsWaitingForData ? ? false ;
2019-06-12 21:56:55 +02:00
}
2022-05-16 22:55:54 +02:00
}