test & fix

This commit is contained in:
Christian Köllner 2020-04-11 15:48:02 +02:00
parent 3c6f5019f2
commit 19b36a1643
15 changed files with 227 additions and 130 deletions

View File

@ -151,6 +151,9 @@ namespace Capnp.Net.Runtime.Tests
Assert.AreEqual(2, asListOfStructs.Count);
Assert.AreEqual(0ul, asListOfStructs[0].ReadDataULong(0));
Assert.AreEqual(ulong.MaxValue, asListOfStructs[1].ReadDataULong(0));
Assert.ThrowsException<IndexOutOfRangeException>(() => asListOfStructs[-1].ReadDataUShort(0));
Assert.ThrowsException<IndexOutOfRangeException>(() => asListOfStructs[3].ReadDataUShort(0));
CollectionAssert.AreEqual(new ulong[] { 0, ulong.MaxValue }, asListOfStructs.Select(_ => _.ReadDataULong(0)).ToArray());
}
[TestMethod]

View File

@ -6,6 +6,7 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Capnp.Net.Runtime.Tests
{
@ -42,33 +43,6 @@ namespace Capnp.Net.Runtime.Tests
Task.WhenAll(tasks).Wait();
}
[TestMethod]
public void AwaitOrderTest2()
{
int returnCounter = 0;
async Task ExpectCount(Task task, int count)
{
await task;
Assert.AreEqual(count, returnCounter++);
}
var tcs = new TaskCompletionSource<int>();
var cts = new CancellationTokenSource();
var tasks =
from i in Enumerable.Range(0, 100)
select ExpectCount(tcs.Task.ContinueWith(
t => t,
cts.Token,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Current), i);
tcs.SetResult(0);
Task.WhenAll(tasks).Wait();
}
class PromisedAnswerMock : IPromisedAnswer
{
readonly TaskCompletionSource<DeserializerState> _tcs = new TaskCompletionSource<DeserializerState>();

View File

@ -67,7 +67,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = policy.Attach<ITestInterface>(new TestInterfaceImpl(counters));
@ -106,7 +106,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = policy.Attach<ITestInterface>(new TestInterfaceImpl(counters));
@ -146,7 +146,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
@ -204,7 +204,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
@ -237,7 +237,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
@ -268,7 +268,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
@ -334,7 +334,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
@ -369,7 +369,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
@ -406,7 +406,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);
@ -431,7 +431,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestTailCallerImpl(counters);
@ -463,7 +463,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestMoreStuffImpl(counters);
@ -503,7 +503,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = policy.Attach<ITestMoreStuff>(new TestMoreStuffImpl(counters));
@ -560,7 +560,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestMoreStuffImpl(counters);
@ -628,7 +628,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestMoreStuffImpl(counters);
@ -668,7 +668,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
server.Main = implAc;
using (var main = client.GetMain<ITestInterface>())

View File

@ -8,6 +8,7 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Capnp.Net.Runtime.Tests
{
@ -138,5 +139,59 @@ namespace Capnp.Net.Runtime.Tests
{
NewLocalTestbed().RunTest(Testsuite.Ownership3);
}
[TestMethod]
public void EagerRace()
{
var impl = new TestMoreStuffImpl(new Counters());
var tcs = new TaskCompletionSource<ITestMoreStuff>();
using (var promise = tcs.Task.Eager(true))
using (var cts = new CancellationTokenSource())
{
var bb = new BufferBlock<Task<uint>>();
int counter = 0;
void Generator()
{
while (!cts.IsCancellationRequested)
{
bb.Post(promise.GetCallSequence((uint)Volatile.Read(ref counter)));
Interlocked.Increment(ref counter);
}
bb.Complete();
}
async Task Verifier()
{
uint i = 0;
while (true)
{
Task<uint> t;
try
{
t = await bb.ReceiveAsync();
}
catch (InvalidOperationException)
{
break;
}
uint j = await t;
Assert.AreEqual(i, j);
i++;
}
}
var genTask = Task.Run(() => Generator());
var verTask = Verifier();
SpinWait.SpinUntil(() => Volatile.Read(ref counter) >= 100);
tcs.SetResult(impl);
cts.Cancel();
Assert.IsTrue(genTask.Wait(MediumNonDbgTimeout));
Assert.IsTrue(verTask.Wait(MediumNonDbgTimeout));
}
}
}
}

View File

@ -68,7 +68,7 @@ namespace Capnp.Net.Runtime.Tests
{
try
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
}
@ -97,7 +97,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -116,7 +116,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -134,7 +134,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -175,7 +175,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -214,7 +214,7 @@ namespace Capnp.Net.Runtime.Tests
{
try
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -258,7 +258,7 @@ namespace Capnp.Net.Runtime.Tests
{
try
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -317,7 +317,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -354,7 +354,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -427,7 +427,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -503,7 +503,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -620,7 +620,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);
@ -667,7 +667,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
SpinWait.SpinUntil(() => server.ConnectionCount > 0, MediumNonDbgTimeout);
Assert.AreEqual(1, server.ConnectionCount);

View File

@ -26,7 +26,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = SetupClient())
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestInterface>())
{
@ -62,8 +62,8 @@ namespace Capnp.Net.Runtime.Tests
using (var client1 = SetupClient())
using (var client2 = SetupClient())
{
Assert.IsTrue(client1.WhenConnected.Wait(MediumNonDbgTimeout));
Assert.IsTrue(client2.WhenConnected.Wait(MediumNonDbgTimeout));
//Assert.IsTrue(client1.WhenConnected.Wait(MediumNonDbgTimeout));
//Assert.IsTrue(client2.WhenConnected.Wait(MediumNonDbgTimeout));
using (var main = client1.GetMain<ITestMoreStuff>())
{
@ -132,7 +132,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = SetupClient())
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<CapnpGen.IB2>())
{
@ -154,7 +154,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = SetupClient())
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<CapnpGen.IIssue25B>())
{
@ -184,7 +184,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = SetupClient())
{
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
//Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -196,7 +196,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client2 = SetupClient())
{
Assert.IsTrue(client2.WhenConnected.Wait(MediumNonDbgTimeout));
//Assert.IsTrue(client2.WhenConnected.Wait(MediumNonDbgTimeout));
using (var main2 = client2.GetMain<ITestMoreStuff>())
{
@ -221,7 +221,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = SetupClient())
{
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
//Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
using (var main = client.GetMain<ITestTailCaller>())
{
@ -232,7 +232,7 @@ namespace Capnp.Net.Runtime.Tests
using (var c = fooTask.Result.C)
using (var client2 = SetupClient())
{
Assert.IsTrue(client2.WhenConnected.Wait(MediumNonDbgTimeout));
//Assert.IsTrue(client2.WhenConnected.Wait(MediumNonDbgTimeout));
using (var main2 = client2.GetMain<ITestTailCaller>())
{
@ -255,7 +255,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = SetupClient())
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{

View File

@ -135,7 +135,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestInterface>())
{
@ -185,7 +185,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestPipeline>())
{
@ -239,7 +239,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -305,7 +305,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -407,7 +407,7 @@ namespace Capnp.Net.Runtime.Tests
client.AttachTracer(tracer);
client.Connect("localhost", TcpPort);
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestTailCaller>())
{
@ -475,7 +475,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -520,7 +520,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -589,7 +589,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -682,7 +682,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var destructionPromise = new TaskCompletionSource<int>();
var destructionTask = destructionPromise.Task;
@ -740,7 +740,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -802,36 +802,15 @@ namespace Capnp.Net.Runtime.Tests
{
LaunchCompatTestProcess("server:MoreStuff", stdout =>
{
int retry = 0;
label:
using (var client = new TcpRpcClient("localhost", TcpPort))
{
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout), "client connect");
//Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout), "client connect");
using (var main = client.GetMain<ITestMoreStuff>())
using (var wrapped = client.GetMain<ITestMoreStuff>())
{
var resolving = main as IResolvingCapability;
bool success;
try
{
success = resolving.WhenResolved.Wait(MediumNonDbgTimeout);
}
catch
{
success = false;
}
if (!success)
{
if (++retry == 5)
{
Assert.Fail("Attempting to obtain bootstrap interface failed. Bailing out.");
}
goto label;
}
var unwrap = wrapped.Unwrap();
Assert.IsTrue(unwrap.Wait(MediumNonDbgTimeout));
var main = unwrap.Result;
var cap = new TestCallOrderImpl();
cap.CountToDispose = 6;
@ -892,7 +871,7 @@ namespace Capnp.Net.Runtime.Tests
label:
using (var client = new TcpRpcClient("localhost", TcpPort))
{
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout), "client connect");
//Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout), "client connect");
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -986,7 +965,7 @@ namespace Capnp.Net.Runtime.Tests
using (var client = new TcpRpcClient("localhost", TcpPort))
{
Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
//Assert.IsTrue(client.WhenConnected.Wait(MediumNonDbgTimeout));
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -1079,7 +1058,7 @@ namespace Capnp.Net.Runtime.Tests
label:
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{
@ -1152,7 +1131,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var client = new TcpRpcClient("localhost", TcpPort))
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
using (var main = client.GetMain<ITestMoreStuff>())
{

View File

@ -43,7 +43,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestMoreStuffImpl(counters);
@ -151,7 +151,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
server.Main = impl;
for (int i = 0; i < 10; i++)

View File

@ -35,7 +35,7 @@ namespace Capnp.Net.Runtime.Tests
using (server)
using (client)
{
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
var impl = new TestMoreStuffImpl(counters);
@ -109,7 +109,7 @@ namespace Capnp.Net.Runtime.Tests
server.InjectMidlayer(s => new ScatteringStream(s, 7));
client.InjectMidlayer(s => new ScatteringStream(s, 10));
client.Connect("localhost", TcpPort);
client.WhenConnected.Wait();
//client.WhenConnected.Wait();
var counters = new Counters();
server.Main = new TestInterfaceImpl(counters);

View File

@ -15,7 +15,7 @@ namespace Capnp.Net.Runtime.Tests
{
public interface ITestbed
{
T ConnectMain<T>(object main) where T : class;
T ConnectMain<T>(object main) where T : class, IDisposable;
void MustComplete(params Task[] tasks);
void MustNotComplete(params Task[] tasks);
void FlushCommunication();
@ -241,7 +241,7 @@ namespace Capnp.Net.Runtime.Tests
public void RunTest(Action<ITestbed> action)
{
(_server, _client) = SetupClientServerPair();
_client.WhenConnected.Wait(MediumNonDbgTimeout);
//_client.WhenConnected.Wait(MediumNonDbgTimeout);
Assert.IsTrue(SpinWait.SpinUntil(() => _server.ConnectionCount > 0, MediumNonDbgTimeout));
var conn = _server.Connections[0];

View File

@ -31,7 +31,7 @@ namespace Capnp
var state = _lpd.State;
if (index < 0 || index >= _lpd.Count)
throw new ArgumentOutOfRangeException(nameof(index));
throw new IndexOutOfRangeException();
state.Offset += index;
state.Kind = ObjectKind.Struct;

View File

@ -1,4 +1,5 @@
using System;
using Capnp.Util;
using System;
using System.Threading;
using System.Threading.Tasks;
@ -18,11 +19,11 @@ namespace Capnp.Rpc
}
readonly Task<Proxy>? _proxyTask;
readonly Task<ConsumedCapability> _capTask;
readonly StrictlyOrderedAwaitTask<ConsumedCapability> _capTask;
public LazyCapability(Task<ConsumedCapability> capabilityTask)
{
_capTask = capabilityTask;
_capTask = capabilityTask.EnforceAwaitOrder();
}
public LazyCapability(Task<Proxy> proxyTask)
@ -31,7 +32,7 @@ namespace Capnp.Rpc
async Task<ConsumedCapability> AwaitCap() => (await _proxyTask!).ConsumedCap;
_capTask = AwaitCap();
_capTask = AwaitCap().EnforceAwaitOrder();
}
internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
@ -61,11 +62,13 @@ namespace Capnp.Rpc
}
}
public Task WhenResolved => _capTask;
async Task AwaitWhenResolved() => await _capTask;
public Task WhenResolved => AwaitWhenResolved();
public T? GetResolvedCapability<T>() where T: class
{
if (_capTask.IsCompleted)
if (_capTask.WrappedTask.IsCompleted)
{
try
{

View File

@ -25,8 +25,9 @@ namespace Capnp.Rpc
if (skeleton == null)
throw new ArgumentNullException(nameof(skeleton));
skeleton.Claim();
_ifmap.Add(interfaceId, skeleton);
if (_ifmap.Count == 1) // Claiming only the first one is sufficient
skeleton.Claim();
}
internal void AddInterface(Skeleton skeleton)
@ -60,8 +61,6 @@ namespace Capnp.Rpc
{
cap.Relinquish();
}
base.Dispose(disposing);
}
internal override void Bind(object impl)

View File

@ -151,26 +151,22 @@ namespace Capnp.Rpc
/// <typeparam name="TProxy">Bootstrap capability interface</typeparam>
/// <returns>A proxy for the bootstrap capability</returns>
/// <exception cref="InvalidOperationException">Not connected</exception>
public TProxy GetMain<TProxy>() where TProxy: class
public TProxy GetMain<TProxy>() where TProxy: class, IDisposable
{
if (WhenConnected == null)
{
throw new InvalidOperationException("Not connecting");
}
if (!WhenConnected.IsCompleted)
async Task<TProxy> GetMainAsync()
{
throw new InvalidOperationException("Connection not yet established");
}
if (!WhenConnected.ReplacementTaskIsCompletedSuccessfully())
{
throw new InvalidOperationException("Connection not successfully established");
}
await WhenConnected!;
return (CapabilityReflection.CreateProxy<TProxy>(_inboundEndpoint!.QueryMain()) as TProxy)!;
}
return GetMainAsync().Eager(true);
}
/// <summary>
/// Dispose pattern implementation
/// </summary>

View File

@ -0,0 +1,88 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Capnp.Util
{
internal class StrictlyOrderedAwaitTask<T>: INotifyCompletion
{
readonly Task<T> _awaitedTask;
object _lock;
long _inOrder, _outOrder;
public StrictlyOrderedAwaitTask(Task<T> awaitedTask)
{
_awaitedTask = awaitedTask;
_lock = new object();
}
public StrictlyOrderedAwaitTask<T> GetAwaiter()
{
return this;
}
public async void OnCompleted(Action continuation)
{
object safeLock = Volatile.Read(ref _lock);
if (safeLock == null)
{
continuation();
return;
}
long sequence = Interlocked.Increment(ref _inOrder) - 1;
try
{
if (_awaitedTask.IsCompleted)
{
Interlocked.Exchange(ref _lock, null);
}
await _awaitedTask;
}
catch
{
}
finally
{
SpinWait.SpinUntil(() =>
{
lock (safeLock)
{
if (Volatile.Read(ref _outOrder) != sequence)
{
return false;
}
Interlocked.Increment(ref _outOrder);
continuation();
return true;
}
});
}
}
public bool IsCompleted => Volatile.Read(ref _lock) == null;
public T GetResult() => _awaitedTask.GetAwaiter().GetResult();
public T Result => _awaitedTask.Result;
public Task<T> WrappedTask => _awaitedTask;
}
internal static class StrictlyOrderedTaskExtensions
{
public static StrictlyOrderedAwaitTask<T> EnforceAwaitOrder<T>(this Task<T> task)
{
return new StrictlyOrderedAwaitTask<T>(task);
}
}
}