From 7b16dbd6e9ed753904c936e178d5d08f326c1ff6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20K=C3=B6llner?= Date: Sat, 22 Feb 2020 21:31:59 +0100 Subject: [PATCH] buffered I/O --- Benchmarking/CapnpProfile/CapnpProfile.csproj | 3 +- Capnp.Net.Runtime/Capnp.Net.Runtime.csproj | 2 +- Capnp.Net.Runtime/FramePump.cs | 2 + Capnp.Net.Runtime/Rpc/TcpRpcClient.cs | 2 + Capnp.Net.Runtime/Rpc/TcpRpcServer.cs | 1 + .../Util/DuplexBufferedStream.cs | 83 +++++++++++++++++++ 6 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 Capnp.Net.Runtime/Util/DuplexBufferedStream.cs diff --git a/Benchmarking/CapnpProfile/CapnpProfile.csproj b/Benchmarking/CapnpProfile/CapnpProfile.csproj index 4b7f55b..fb4935e 100644 --- a/Benchmarking/CapnpProfile/CapnpProfile.csproj +++ b/Benchmarking/CapnpProfile/CapnpProfile.csproj @@ -8,10 +8,11 @@ full true + x64 - + diff --git a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj index 548cd7b..443111e 100644 --- a/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj +++ b/Capnp.Net.Runtime/Capnp.Net.Runtime.csproj @@ -29,7 +29,7 @@ - full + portable true diff --git a/Capnp.Net.Runtime/FramePump.cs b/Capnp.Net.Runtime/FramePump.cs index 0ff6791..e2ecabc 100644 --- a/Capnp.Net.Runtime/FramePump.cs +++ b/Capnp.Net.Runtime/FramePump.cs @@ -116,6 +116,8 @@ namespace Capnp #endif _writer.Write(bytes); } + + _writer.Flush(); } } diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs index 02a975d..d6d333e 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcClient.cs @@ -128,6 +128,8 @@ namespace Capnp.Rpc _rpcEngine = new RpcEngine(); _client = new TcpClient(); _client.ExclusiveAddressUse = false; + + InjectMidlayer(s => new Util.DuplexBufferedStream(s)); } /// diff --git a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs index 7ee616a..9ae030d 100644 --- a/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs +++ b/Capnp.Net.Runtime/Rpc/TcpRpcServer.cs @@ -68,6 +68,7 @@ namespace Capnp.Rpc _server = server; Client = client; _stream = client.GetStream(); + InjectMidlayer(s => new Util.DuplexBufferedStream(s)); } public void Start() diff --git a/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs new file mode 100644 index 0000000..05bc69a --- /dev/null +++ b/Capnp.Net.Runtime/Util/DuplexBufferedStream.cs @@ -0,0 +1,83 @@ +using System; +using System.IO; + +namespace Capnp.Util +{ + internal class DuplexBufferedStream : Stream + { + readonly BufferedStream _readStream; + readonly BufferedStream _writeStream; + readonly object _reentrancyBlocker = new object(); + + public DuplexBufferedStream(Stream stream) + { + _readStream = new BufferedStream(stream); + _writeStream = new BufferedStream(stream); + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => 0; + + public override long Position + { + get => 0; + set => throw new NotSupportedException(); + } + + public override void Flush() + { + _writeStream.Flush(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _readStream.Read(buffer, offset, count); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + _writeStream.Write(buffer, offset, count); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + lock (_reentrancyBlocker) + { + try + { + _readStream.Dispose(); + } + catch + { + } + try + { + _writeStream.Dispose(); + } + catch + { + } + } + } + + base.Dispose(disposing); + } + } +}