diff --git a/Benchmarking/Benchmark/Benchmark.csproj b/Benchmarking/Benchmark/Benchmark.csproj index 56eb5f7..43c2aab 100644 --- a/Benchmarking/Benchmark/Benchmark.csproj +++ b/Benchmarking/Benchmark/Benchmark.csproj @@ -7,7 +7,7 @@ - + diff --git a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj index 317f6be..3b41a29 100644 --- a/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj +++ b/Benchmarking/EchoServiceCapnp/EchoServiceCapnp.csproj @@ -6,7 +6,7 @@ - + diff --git a/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs index 8205ee4..200aecd 100644 --- a/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs +++ b/Capnp.Net.Runtime/Rpc/MidlayerExtensions.cs @@ -17,7 +17,7 @@ namespace Capnp.Rpc /// Buffer size (bytes). You should choose it according to the maximum expected raw capnp frame size public static void AddBuffering(this ISupportsMidlayers obj, int bufferSize) { - obj.InjectMidlayer(s => new Util.WriteBufferedStream(new Util.BufferedNetworkStreamAdapter(s, bufferSize), bufferSize)); + obj.InjectMidlayer(s => new Util.DuplexBufferedStream(new Util.AsyncNetworkStreamAdapter(s), bufferSize)); } /// @@ -27,7 +27,7 @@ namespace Capnp.Rpc /// or public static void AddBuffering(this ISupportsMidlayers obj) { - obj.InjectMidlayer(s => new Util.WriteBufferedStream(new Util.BufferedNetworkStreamAdapter(s))); + obj.InjectMidlayer(s => new Util.DuplexBufferedStream(new Util.AsyncNetworkStreamAdapter(s))); } } } diff --git a/Capnp.Net.Runtime/Util/WriteBufferedStream.cs b/Capnp.Net.Runtime/Util/AsyncNetworkStreamAdapter.cs similarity index 53% rename from Capnp.Net.Runtime/Util/WriteBufferedStream.cs rename to Capnp.Net.Runtime/Util/AsyncNetworkStreamAdapter.cs index 6e91c9d..cfd255a 100644 --- a/Capnp.Net.Runtime/Util/WriteBufferedStream.cs +++ b/Capnp.Net.Runtime/Util/AsyncNetworkStreamAdapter.cs @@ -1,28 +1,19 @@ using System; using System.IO; +using System.Net.Sockets; +using System.Threading; namespace Capnp.Util { - internal class WriteBufferedStream : Stream + internal class AsyncNetworkStreamAdapter : Stream { - // A buffer size of 1024 bytes seems to be a good comprise, giving good performance - // in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes. - const int DefaultBufferSize = 1024; - - readonly Stream _readStream; - readonly BufferedStream _writeStream; - readonly int _bufferSize; + readonly NetworkStream _stream; readonly object _reentrancyBlocker = new object(); + //Exception? _bufferedException; - public WriteBufferedStream(Stream stream, int bufferSize) - { - _readStream = stream; - _writeStream = new BufferedStream(stream, bufferSize); - _bufferSize = bufferSize; - } - - public WriteBufferedStream(Stream stream) : this(stream, DefaultBufferSize) + public AsyncNetworkStreamAdapter(Stream stream) { + _stream = stream as NetworkStream ?? throw new ArgumentException("stream argument must be a NetworkStream"); } public override bool CanRead => true; @@ -41,12 +32,12 @@ namespace Capnp.Util public override void Flush() { - _writeStream.Flush(); + _stream.Flush(); } public override int Read(byte[] buffer, int offset, int count) { - return _readStream.Read(buffer, offset, count); + return _stream.Read(buffer, offset, count); } public override long Seek(long offset, SeekOrigin origin) @@ -59,12 +50,30 @@ namespace Capnp.Util throw new NotSupportedException(); } + //void WriteCallback(IAsyncResult ar) + //{ + // try + // { + // _stream.EndWrite(ar); + // } + // catch (Exception exception) + // { + // Volatile.Write(ref _bufferedException, exception); + // } + //} + public override void Write(byte[] buffer, int offset, int count) { - if (buffer.Length > _bufferSize) // avoid moiré-like timing effects - _writeStream.Flush(); + _stream.WriteAsync(buffer, offset, count); + //var exception = Volatile.Read(ref _bufferedException); + + //if (exception != null) + //{ + // Dispose(); + // throw exception; + //} - _writeStream.Write(buffer, offset, count); + //_stream.BeginWrite(buffer, offset, count, WriteCallback, null); } protected override void Dispose(bool disposing) @@ -75,14 +84,7 @@ namespace Capnp.Util { try { - _readStream.Dispose(); - } - catch - { - } - try - { - _writeStream.Dispose(); + _stream.Dispose(); } catch { diff --git a/Capnp.Net.Runtime/Util/BufferedNetworkStreamAdapter.cs b/Capnp.Net.Runtime/Util/BufferedNetworkStreamAdapter.cs deleted file mode 100644 index f52a4f6..0000000 --- a/Capnp.Net.Runtime/Util/BufferedNetworkStreamAdapter.cs +++ /dev/null @@ -1,114 +0,0 @@ -using System; -using System.IO; -using System.Net.Sockets; -using System.Threading; - -namespace Capnp.Util -{ - internal class BufferedNetworkStreamAdapter : Stream - { - // A buffer size of 1024 bytes seems to be a good comprise, giving good performance - // in TCP/IP-over-localhost scenarios for small to medium (200kiB) frame sizes. - const int DefaultBufferSize = 1024; - - readonly BufferedStream _readStream; - readonly NetworkStream _writeStream; - readonly object _reentrancyBlocker = new object(); - Exception? _bufferedException; - - public BufferedNetworkStreamAdapter(Stream stream, int bufferSize) - { - _readStream = new BufferedStream(stream, bufferSize); - _writeStream = stream as NetworkStream ?? throw new ArgumentException("stream argument must be a NetworkStream"); - } - - public BufferedNetworkStreamAdapter(Stream stream) : this(stream, DefaultBufferSize) - { - } - - public override bool CanRead => true; - - public override bool CanSeek => false; - - public override bool CanWrite => true; - - public override long Length => 0; - - public override long Position - { - get => 0; - set => throw new NotSupportedException(); - } - - public override void Flush() - { - _writeStream.Flush(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - return _readStream.Read(buffer, offset, count); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - void WriteCallback(IAsyncResult ar) - { - try - { - _writeStream.EndWrite(ar); - } - catch (Exception exception) - { - Volatile.Write(ref _bufferedException, exception); - } - } - - public override void Write(byte[] buffer, int offset, int count) - { - var exception = Volatile.Read(ref _bufferedException); - - if (exception != null) - { - Dispose(); - throw exception; - } - - _writeStream.BeginWrite(buffer, offset, count, WriteCallback, null); - } - - protected override void Dispose(bool disposing) - { - if (disposing) - { - lock (_reentrancyBlocker) - { - try - { - _readStream.Dispose(); - } - catch - { - } - try - { - _writeStream.Dispose(); - } - catch - { - } - } - } - - base.Dispose(disposing); - } - } -}