2019-09-02 19:35:25 +02:00
using Capnp.FrameTracing ;
using Microsoft.Extensions.Logging ;
2019-06-12 21:56:55 +02:00
using System ;
using System.Collections.Generic ;
using System.Diagnostics ;
using System.Net ;
using System.Net.Sockets ;
2019-06-22 18:43:30 -04:00
using System.Runtime.CompilerServices ;
2019-06-12 21:56:55 +02:00
using System.Text ;
using System.Threading ;
using System.Threading.Tasks ;
namespace Capnp.Rpc
{
/// <summary>
/// TCP-based RPC implementation which will establish a connection to a TCP server implementing
/// the Cap'n Proto RPC protocol.
/// </summary>
2019-09-02 19:35:25 +02:00
public class TcpRpcClient : IConnection , IDisposable
2019-06-12 21:56:55 +02:00
{
ILogger Logger { get ; } = Logging . CreateLogger < TcpRpcClient > ( ) ;
class OutboundTcpEndpoint : IEndpoint
{
readonly TcpRpcClient _client ;
readonly FramePump _pump ;
public OutboundTcpEndpoint ( TcpRpcClient client , FramePump pump )
{
_client = client ;
_pump = pump ;
}
public void Dismiss ( )
{
_pump . Dispose ( ) ;
}
public void Forward ( WireFrame frame )
{
_pump . Send ( frame ) ;
}
}
readonly RpcEngine _rpcEngine ;
readonly TcpClient _client ;
RpcEngine . RpcEndpoint _inboundEndpoint ;
OutboundTcpEndpoint _outboundEndpoint ;
FramePump _pump ;
Thread _pumpThread ;
2019-09-02 19:35:25 +02:00
Action _attachTracerAction ;
2019-06-12 21:56:55 +02:00
/// <summary>
2019-09-02 19:35:25 +02:00
/// Gets a Task which completes when TCP is connected. Will be
/// null until connection is actually requested (either by calling Connect or using appropriate constructor).
2019-06-12 21:56:55 +02:00
/// </summary>
2019-09-02 19:35:25 +02:00
public Task WhenConnected { get ; private set ; }
2019-06-12 21:56:55 +02:00
async Task ConnectAsync ( string host , int port )
{
2019-07-07 19:59:31 +02:00
for ( int retry = 0 ; ; retry + + )
2019-06-12 21:56:55 +02:00
{
2019-07-07 19:59:31 +02:00
try
{
await _client . ConnectAsync ( host , port ) ;
return ;
}
catch ( SocketException exception ) when ( retry < 240 & & exception . SocketErrorCode = = SocketError . AddressAlreadyInUse )
{
await Task . Delay ( 1000 ) ;
}
catch ( SocketException exception )
{
throw new RpcException ( "TcpRpcClient is unable to connect" , exception ) ;
}
2019-06-12 21:56:55 +02:00
}
}
2019-07-04 22:05:37 +02:00
2019-09-02 19:35:25 +02:00
async Task ConnectAndRunAsync ( string host , int port )
2019-06-12 21:56:55 +02:00
{
await ConnectAsync ( host , port ) ;
2019-09-02 19:35:25 +02:00
State = ConnectionState . Active ;
2019-06-12 21:56:55 +02:00
_pump = new FramePump ( _client . GetStream ( ) ) ;
2019-09-02 19:35:25 +02:00
_attachTracerAction ? . Invoke ( ) ;
2019-06-12 21:56:55 +02:00
_outboundEndpoint = new OutboundTcpEndpoint ( this , _pump ) ;
_inboundEndpoint = _rpcEngine . AddEndpoint ( _outboundEndpoint ) ;
_pumpThread = new Thread ( ( ) = >
{
try
{
Thread . CurrentThread . Name = $"TCP RPC Client Thread {Thread.CurrentThread.ManagedThreadId}" ;
_pump . Run ( ) ;
}
finally
{
2019-09-02 19:35:25 +02:00
State = ConnectionState . Down ;
2019-06-12 21:56:55 +02:00
_outboundEndpoint . Dismiss ( ) ;
_inboundEndpoint . Dismiss ( ) ;
_pump . Dispose ( ) ;
}
} ) ;
_pump . FrameReceived + = _inboundEndpoint . Forward ;
_pumpThread . Start ( ) ;
}
/// <summary>
/// Constructs an instance and attempts to connect it to given host.
/// </summary>
/// <param name="host">The DNS name of the remote RPC host</param>
/// <param name="port">The port number of the remote RPC host</param>
/// <exception cref="ArgumentNullException"><paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="System.Net.Sockets.SocketException">An error occurred when accessing the socket.</exception>
2019-09-02 19:35:25 +02:00
public TcpRpcClient ( string host , int port ) : this ( )
{
Connect ( host , port ) ;
}
/// <summary>
/// Constructs an instance but does not yet attempt to connect.
/// </summary>
public TcpRpcClient ( )
2019-06-12 21:56:55 +02:00
{
_rpcEngine = new RpcEngine ( ) ;
_client = new TcpClient ( ) ;
2019-06-22 18:43:30 -04:00
_client . ExclusiveAddressUse = false ;
2019-09-02 19:35:25 +02:00
}
/// <summary>
/// Attempts to connect it to given host.
/// </summary>
/// <param name="host">The DNS name of the remote RPC host</param>
/// <param name="port">The port number of the remote RPC host</param>
/// <exception cref="ArgumentNullException"><paramref name="host"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="port"/> is not between System.Net.IPEndPoint.MinPort and System.Net.IPEndPoint.MaxPort.</exception>
/// <exception cref="System.Net.Sockets.SocketException">An error occurred when accessing the socket.</exception>
/// <exception cref="InvalidOperationException">Connection was already requested</exception>
public void Connect ( string host , int port )
{
if ( WhenConnected ! = null )
throw new InvalidOperationException ( "Connection was already requested" ) ;
2019-06-12 21:56:55 +02:00
2019-09-02 19:35:25 +02:00
WhenConnected = ConnectAndRunAsync ( host , port ) ;
2019-06-12 21:56:55 +02:00
}
/// <summary>
/// Returns the remote bootstrap capability.
/// </summary>
/// <typeparam name="TProxy">Bootstrap capability interface</typeparam>
/// <returns>A proxy for the bootstrap capability</returns>
public TProxy GetMain < TProxy > ( ) where TProxy : class
{
if ( ! WhenConnected . IsCompleted )
{
throw new InvalidOperationException ( "Connection not yet established" ) ;
}
2019-06-22 18:43:30 -04:00
if ( ! WhenConnected . ReplacementTaskIsCompletedSuccessfully ( ) )
2019-06-12 21:56:55 +02:00
{
throw new InvalidOperationException ( "Connection not successfully established" ) ;
}
Debug . Assert ( _inboundEndpoint ! = null ) ;
return CapabilityReflection . CreateProxy < TProxy > ( _inboundEndpoint . QueryMain ( ) ) as TProxy ;
}
/// <summary>
/// Dispose pattern implementation
/// </summary>
public void Dispose ( )
{
_client . Dispose ( ) ;
try
{
if ( ! WhenConnected . Wait ( 500 ) )
{
Logger . LogError ( "Unable to join connection task within timeout" ) ;
}
}
2019-06-22 18:43:30 -04:00
catch ( System . Exception e )
2019-06-12 21:56:55 +02:00
{
2019-06-22 18:43:30 -04:00
Logger . LogError ( e , "Failure disposing client" ) ;
2019-06-12 21:56:55 +02:00
}
if ( _pumpThread ! = null & & ! _pumpThread . Join ( 500 ) )
{
Logger . LogError ( "Unable to join pump thread within timeout" ) ;
}
GC . SuppressFinalize ( this ) ;
}
2019-09-02 19:35:25 +02:00
/// <summary>
/// Attaches a tracer to this connection. Only allowed in state 'Initializing'. To avoid race conditions,
/// this method should only be used in conjunction with the parameterless constructor (no auto-connect).
/// Call this method *before* calling Connect.
/// </summary>
/// <param name="tracer">Tracer to attach</param>
/// <exception cref="ArgumentNullException"><paramref name="tracer"/> is null</exception>
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
public void AttachTracer ( IFrameTracer tracer )
{
if ( tracer = = null )
throw new ArgumentNullException ( nameof ( tracer ) ) ;
if ( State ! = ConnectionState . Initializing )
throw new InvalidOperationException ( "Connection is not in state 'Initializing'" ) ;
_attachTracerAction + = ( ) = >
{
_pump . AttachTracer ( tracer ) ;
} ;
}
/// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually.
/// </summary>
void IConnection . Close ( )
{
_client . Dispose ( ) ;
}
/// <summary>
/// Returns the state of this connection.
/// </summary>
public ConnectionState State { get ; private set ; } = ConnectionState . Initializing ;
2019-06-12 21:56:55 +02:00
/// <summary>
/// Gets the number of RPC protocol messages sent by this client so far.
/// </summary>
public long SendCount = > _inboundEndpoint . SendCount ;
/// <summary>
/// Gets the number of RPC protocol messages received by this client so far.
/// </summary>
public long RecvCount = > _inboundEndpoint . RecvCount ;
/// <summary>
/// Gets the remote port number which this client is connected to,
/// or null if the connection is not yet established.
/// </summary>
public int? RemotePort = > ( ( IPEndPoint ) _client . Client ? . RemoteEndPoint ) ? . Port ;
2019-09-02 19:35:25 +02:00
/// <summary>
/// Gets the local port number which this client using,
/// or null if the connection is not yet established.
/// </summary>
public int? LocalPort = > ( ( IPEndPoint ) _client . Client ? . LocalEndPoint ) ? . Port ;
2019-06-12 21:56:55 +02:00
/// <summary>
/// Whether the I/O thread is currently running
/// </summary>
public bool IsComputing = > _pumpThread . ThreadState = = System . Threading . ThreadState . Running ;
/// <summary>
/// Whether the I/O thread is waiting for data to receive
/// </summary>
public bool IsWaitingForData = > _pump . IsWaitingForData ;
}
}