configurable buffering support

This commit is contained in:
Christian Köllner 2020-02-22 23:47:56 +01:00
parent 49c5e80436
commit 596a97a362
9 changed files with 88 additions and 26 deletions

View File

@ -7,7 +7,7 @@
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.12.0" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.31-g7b16dbd6e9" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.32-g49c5e80436" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
<PackageReference Include="Google.Protobuf" Version="3.11.3" />
<PackageReference Include="Grpc.Net.Client" Version="2.27.0" />

View File

@ -6,7 +6,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.31-g7b16dbd6e9" />
<PackageReference Include="Capnp.Net.Runtime" Version="1.3.32-g49c5e80436" />
<PackageReference Include="CapnpC.CSharp.MsBuild.Generation" Version="1.2.138" />
</ItemGroup>

View File

@ -98,14 +98,7 @@ namespace Capnp.Net.Runtime.Tests
using (var server = new TcpRpcServer(IPAddress.Any, TcpPort))
using (var client = new TcpRpcClient())
{
server.OnConnectionChanged += (_, e) =>
{
if (e.Connection.State == ConnectionState.Initializing)
{
e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7));
}
};
server.InjectMidlayer(s => new ScatteringStream(s, 7));
client.InjectMidlayer(s => new ScatteringStream(s, 10));
client.Connect("localhost", TcpPort);
client.WhenConnected.Wait();

View File

@ -21,7 +21,13 @@ namespace Capnp.Net.Runtime.Tests
protected ILogger Logger { get; set; }
protected TcpRpcClient SetupClient() => new TcpRpcClient("localhost", TcpPort);
protected TcpRpcClient SetupClient()
{
var client = new TcpRpcClient("localhost", TcpPort);
client.AddBuffering();
return client;
}
protected TcpRpcServer SetupServer()
{
int attempt = 0;
@ -30,7 +36,9 @@ namespace Capnp.Net.Runtime.Tests
{
try
{
return new TcpRpcServer(IPAddress.Any, TcpPort);
var server = new TcpRpcServer(IPAddress.Any, TcpPort);
server.AddBuffering();
return server;
}
catch (SocketException)
{

View File

@ -4,10 +4,11 @@ using System.IO;
namespace Capnp.Rpc
{
/// <summary>
/// Models an RPC connection.
/// </summary>
public interface IConnection
public interface IConnection: ISupportsMidlayers
{
/// <summary>
/// Returns the state of this connection.
@ -52,15 +53,6 @@ namespace Capnp.Rpc
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
void AttachTracer(IFrameTracer tracer);
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
void InjectMidlayer(Func<Stream, Stream> createFunc);
/// <summary>
/// Prematurely closes this connection. Note that there is usually no need to close a connection manually. The typical use case
/// of this method is to refuse an incoming connection in the <code>TcpRpcServer.OnConnectionChanged</code> callback.

View File

@ -0,0 +1,21 @@
using System;
using System.IO;
namespace Capnp.Rpc
{
/// <summary>
/// Common interface for classes supporting the installation of midlayers.
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
/// </summary>
public interface ISupportsMidlayers
{
/// <summary>
/// Installs a midlayer
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
void InjectMidlayer(Func<Stream, Stream> createFunc);
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace Capnp.Rpc
{
/// <summary>
/// Provides extension methods for installing midlayers to <see cref="TcpRpcServer"/> and <see cref="TcpRpcClient"/>./>.
/// </summary>
public static class MidlayerExtensions
{
/// <summary>
/// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations,
/// hence may cause a significant performance boost.
/// </summary>
/// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param>
/// <param name="bufferSize">Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size</param>
public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize)
{
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s, bufferSize));
}
/// <summary>
/// Enables stream buffering on the given object. Stream buffering reduces the number of I/O operations,
/// hence may cause a significant performance boost. Some default buffer size will be chosen.
/// </summary>
/// <param name="obj"><see cref="TcpRpcServer"/> or <see cref="TcpRpcClient"/></param>
public static void AddBuffering(this ISupportsMidlayers obj)
{
obj.InjectMidlayer(s => new Util.DuplexBufferedStream(s));
}
}
}

View File

@ -128,8 +128,6 @@ namespace Capnp.Rpc
_rpcEngine = new RpcEngine();
_client = new TcpClient();
_client.ExclusiveAddressUse = false;
InjectMidlayer(s => new Util.DuplexBufferedStream(s));
}
/// <summary>

View File

@ -32,7 +32,7 @@ namespace Capnp.Rpc
/// <summary>
/// Cap'n Proto RPC TCP server.
/// </summary>
public class TcpRpcServer: IDisposable
public class TcpRpcServer: ISupportsMidlayers, IDisposable
{
ILogger Logger { get; } = Logging.CreateLogger<TcpRpcServer>();
@ -68,7 +68,6 @@ namespace Capnp.Rpc
_server = server;
Client = client;
_stream = client.GetStream();
InjectMidlayer(s => new Util.DuplexBufferedStream(s));
}
public void Start()
@ -272,6 +271,24 @@ namespace Capnp.Rpc
}
}
/// <summary>
/// Installs a midlayer.
/// A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism can be used for buffering, various (de-)compression algorithms, and more.
/// </summary>
/// <param name="createFunc"></param>
public void InjectMidlayer(Func<Stream, Stream> createFunc)
{
OnConnectionChanged += (_, e) =>
{
if (e.Connection.State == ConnectionState.Initializing)
{
e.Connection.InjectMidlayer(createFunc);
}
};
}
/// <summary>
/// Constructs an instance.
/// </summary>