Improvements w.r.t. issue #37

This commit is contained in:
Christian Köllner 2020-02-01 13:49:45 +01:00
parent 72308d38d5
commit 1ec5b0e955
5 changed files with 33 additions and 2 deletions

View File

@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.TestTools.UnitTesting; using Microsoft.VisualStudio.TestTools.UnitTesting;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Net;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
@ -94,9 +95,17 @@ namespace Capnp.Net.Runtime.Tests
public void ScatteredTransfer() public void ScatteredTransfer()
{ {
using (var server = SetupServer()) using (var server = new TcpRpcServer(IPAddress.Any, TcpPort))
using (var client = new TcpRpcClient()) using (var client = new TcpRpcClient())
{ {
server.OnConnectionChanged += (_, e) =>
{
if (e.Connection.State == ConnectionState.Initializing)
{
e.Connection.InjectMidlayer(s => new ScatteringStream(s, 7));
}
};
client.InjectMidlayer(s => new ScatteringStream(s, 10)); client.InjectMidlayer(s => new ScatteringStream(s, 10));
client.Connect("localhost", TcpPort); client.Connect("localhost", TcpPort);
client.WhenConnected.Wait(); client.WhenConnected.Wait();

View File

@ -110,13 +110,14 @@ namespace Capnp
#else #else
var buffer = MemoryMarshal.Cast<ulong, byte>(buffers[i].Span); var buffer = MemoryMarshal.Cast<ulong, byte>(buffers[i].Span);
while (buffer.Length > 0) do
{ {
int obtained = reader.Read(buffer); int obtained = reader.Read(buffer);
if (obtained == 0) if (obtained == 0)
throw StreamClosed(); throw StreamClosed();
buffer = buffer.Slice(obtained); buffer = buffer.Slice(obtained);
} }
while (buffer.Length > 0);
#endif #endif
} }
} }

View File

@ -52,6 +52,13 @@ namespace Capnp.Rpc
/// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception> /// <exception cref="InvalidOperationException">Connection is not in state 'Initializing'</exception>
void AttachTracer(IFrameTracer tracer); void AttachTracer(IFrameTracer tracer);
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
void InjectMidlayer(Func<Stream, Stream> createFunc); void InjectMidlayer(Func<Stream, Stream> createFunc);
/// <summary> /// <summary>

View File

@ -222,6 +222,13 @@ namespace Capnp.Rpc
}; };
} }
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
public void InjectMidlayer(Func<Stream, Stream> createFunc) public void InjectMidlayer(Func<Stream, Stream> createFunc)
{ {
if (createFunc == null) if (createFunc == null)

View File

@ -128,6 +128,13 @@ namespace Capnp.Rpc
Pump.AttachTracer(tracer); Pump.AttachTracer(tracer);
} }
/// <summary>
/// Installs a midlayer. A midlayer is a protocal layer that resides somewhere between capnp serialization and the raw TCP stream.
/// Thus, we have a hook mechanism for transforming data before it is sent to the TCP connection or after it was received
/// by the TCP connection, respectively. This mechanism may be used for integrating various (de-)compression algorithms.
/// </summary>
/// <param name="createFunc">Callback for wrapping the midlayer around its underlying stream</param>
/// <exception cref="ArgumentNullException"><paramref name="createFunc"/> is null</exception>
public void InjectMidlayer(Func<Stream, Stream> createFunc) public void InjectMidlayer(Func<Stream, Stream> createFunc)
{ {
if (createFunc == null) if (createFunc == null)