factored StrictlyOrderedAwaitTask into resolving proxy tasks

This commit is contained in:
Christian Köllner 2020-04-23 22:34:45 +02:00
parent 3924fa0080
commit 0b8e32edf9
14 changed files with 59 additions and 66 deletions

View File

@ -1284,7 +1284,7 @@ namespace Capnp.Net.Runtime.Tests
Assert.AreEqual(Message.WHICH.Finish, _.which);
});
Assert.IsTrue(proxy.WhenResolved.IsCompleted);
Assert.IsTrue(proxy.WhenResolved.IsFaulted);
Assert.IsTrue(proxy.WhenResolved.WrappedTask.IsFaulted);
tester.ExpectAbort();
}

View File

@ -79,7 +79,7 @@ namespace Capnp.Net.Runtime.Tests
server.Main = new ProvidedCapabilityMock();
var main = client.GetMain<BareProxy>();
var resolving = main as IResolvingCapability;
Assert.IsTrue(resolving.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(resolving.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
}
}
@ -97,7 +97,7 @@ namespace Capnp.Net.Runtime.Tests
var main = client.GetMain<BareProxy>();
var resolving = main as IResolvingCapability;
Assert.IsTrue(Assert.ThrowsExceptionAsync<RpcException>(() => resolving.WhenResolved).Wait(MediumNonDbgTimeout));
Assert.IsTrue(Assert.ThrowsExceptionAsync<RpcException>(() => resolving.WhenResolved.WrappedTask).Wait(MediumNonDbgTimeout));
}
}
@ -116,7 +116,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -157,7 +157,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -197,7 +197,7 @@ namespace Capnp.Net.Runtime.Tests
server.Main = mock;
var main = client.GetMain<BareProxy>();
var resolving = main as IResolvingCapability;
Assert.IsTrue(resolving.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(resolving.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -240,7 +240,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -300,7 +300,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -337,7 +337,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -410,7 +410,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -486,7 +486,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -603,7 +603,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -650,7 +650,7 @@ namespace Capnp.Net.Runtime.Tests
var mock = new ProvidedCapabilityMock();
server.Main = mock;
var main = client.GetMain<BareProxy>();
Assert.IsTrue(main.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(main.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var args = DynamicSerializerState.CreateForRpc();
args.SetStruct(1, 0);
args.WriteData(0, 123456);
@ -746,7 +746,7 @@ namespace Capnp.Net.Runtime.Tests
Assert.AreEqual(c1, server.Connections[0]);
Assert.AreEqual(ConnectionState.Active, c1.State);
var proxy = client1.GetMain<ITestInterface>();
Assert.IsTrue(proxy is IResolvingCapability r && r.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(proxy is IResolvingCapability r && r.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
Assert.IsTrue(c1.RecvCount > 0);
Assert.IsTrue(c1.SendCount > 0);

View File

@ -116,7 +116,7 @@ namespace Capnp.Net.Runtime.Tests
try
{
Assert.IsTrue(((IResolvingCapability)main).WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(((IResolvingCapability)main).WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
}
catch (AggregateException)
{

View File

@ -310,7 +310,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = client.GetMain<ITestMoreStuff>())
{
((Proxy)main).WhenResolved.Wait(MediumNonDbgTimeout);
((Proxy)main).WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout);
async Task VerifyOutput()
{
@ -896,7 +896,7 @@ namespace Capnp.Net.Runtime.Tests
try
{
success = resolving.WhenResolved.Wait(MediumNonDbgTimeout);
success = resolving.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout);
}
catch
{
@ -986,7 +986,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = client.GetMain<ITestMoreStuff>())
{
var resolving = main as IResolvingCapability;
Assert.IsTrue(resolving.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(resolving.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var cap = new TaskCompletionSource<ITestCallOrder>();
@ -1088,7 +1088,7 @@ namespace Capnp.Net.Runtime.Tests
try
{
success = resolving.WhenResolved.Wait(MediumNonDbgTimeout);
success = resolving.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout);
}
catch
{
@ -1158,7 +1158,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = client.GetMain<ITestMoreStuff>())
{
var resolving = main as IResolvingCapability;
Assert.IsTrue(resolving.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(resolving.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
var tcs = new TaskCompletionSource<ITestInterface>();

View File

@ -49,7 +49,7 @@ namespace Capnp.Net.Runtime.Tests
server.Main = new TestMoreStuffImpl(counters);
using (var main = client.GetMain<ITestMoreStuff>())
{
((Proxy)main).WhenResolved.Wait(MediumNonDbgTimeout);
((Proxy)main).WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout);
// Since we have a threaded model, there is no way to deterministically provoke the situation
// where Cancel and Finish message cross paths. Instead, we'll do a lot of such requests and
@ -158,7 +158,7 @@ namespace Capnp.Net.Runtime.Tests
{
using (var main = client.GetMain<ITestMoreStuff>())
{
((Proxy)main).WhenResolved.Wait(MediumNonDbgTimeout);
((Proxy)main).WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout);
}
Assert.IsFalse(impl.IsDisposed);
}

View File

@ -43,7 +43,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = client.GetMain<ITestMoreStuff>())
{
var resolving = main as IResolvingCapability;
Assert.IsTrue(resolving.WhenResolved.Wait(MediumNonDbgTimeout));
Assert.IsTrue(resolving.WhenResolved.WrappedTask.Wait(MediumNonDbgTimeout));
}
}
});

View File

@ -65,7 +65,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
testbed.MustComplete(resolving.WhenResolved.WrappedTask);
var cap = new TestCallOrderImpl();
cap.CountToDispose = 6;
@ -92,20 +92,12 @@ namespace Capnp.Net.Runtime.Tests
var call4 = pipeline.GetCallSequence(4, default);
var call5 = pipeline.GetCallSequence(5, default);
try
{
testbed.MustComplete(call0);
testbed.MustComplete(call1);
testbed.MustComplete(call2);
testbed.MustComplete(call3);
testbed.MustComplete(call4);
testbed.MustComplete(call5);
}
catch (System.Exception)
{
cap.CountToDispose = null;
throw;
}
testbed.MustComplete(call0);
testbed.MustComplete(call1);
testbed.MustComplete(call2);
testbed.MustComplete(call3);
testbed.MustComplete(call4);
testbed.MustComplete(call5);
Assert.AreEqual(0u, call0.Result);
Assert.AreEqual(1u, call1.Result);
@ -113,6 +105,7 @@ namespace Capnp.Net.Runtime.Tests
Assert.AreEqual(3u, call3.Result);
Assert.AreEqual(4u, call4.Result);
Assert.AreEqual(5u, call5.Result);
Assert.AreEqual(cap.Count, cap.CountToDispose, "counter must have reached number of calls");
}
}
}
@ -182,7 +175,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
testbed.MustComplete(resolving.WhenResolved.WrappedTask);
var cap = new TaskCompletionSource<ITestCallOrder>();
@ -223,7 +216,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
testbed.MustComplete(resolving.WhenResolved.WrappedTask);
var promise = main.GetNull(default);
@ -250,7 +243,7 @@ namespace Capnp.Net.Runtime.Tests
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
testbed.MustComplete(resolving.WhenResolved.WrappedTask);
var tcs = new TaskCompletionSource<ITestInterface>();
@ -770,7 +763,7 @@ namespace Capnp.Net.Runtime.Tests
peer.EnableEcho();
testbed.MustComplete(r.WhenResolved);
testbed.MustComplete(r.WhenResolved.WrappedTask);
heldTask.Result.Dispose();
}

View File

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using Capnp.Util;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
@ -10,7 +11,7 @@ namespace Capnp.Rpc
/// <summary>
/// Completes when the capability gets resolved.
/// </summary>
Task WhenResolved { get; }
StrictlyOrderedAwaitTask WhenResolved { get; }
/// <summary>
/// Returns the resolved capability

View File

@ -18,7 +18,7 @@ namespace Capnp.Rpc
return new LazyCapability(Task.FromCanceled<ConsumedCapability>(token));
}
readonly Task<Proxy>? _proxyTask;
readonly StrictlyOrderedAwaitTask<Proxy>? _proxyTask;
readonly StrictlyOrderedAwaitTask<ConsumedCapability> _capTask;
public LazyCapability(Task<ConsumedCapability> capabilityTask)
@ -28,7 +28,7 @@ namespace Capnp.Rpc
public LazyCapability(Task<Proxy> proxyTask)
{
_proxyTask = proxyTask;
_proxyTask = proxyTask.EnforceAwaitOrder();
async Task<ConsumedCapability> AwaitCap() => (await _proxyTask!).ConsumedCap;
@ -37,7 +37,7 @@ namespace Capnp.Rpc
internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{
if (WhenResolved.ReplacementTaskIsCompletedSuccessfully())
if (WhenResolved.IsCompleted && WhenResolved.WrappedTask.ReplacementTaskIsCompletedSuccessfully())
{
using var proxy = GetResolvedCapability<BareProxy>()!;
return proxy.Export(endpoint, writer);
@ -62,9 +62,7 @@ namespace Capnp.Rpc
}
}
async Task AwaitWhenResolved() => await _capTask;
public Task WhenResolved => AwaitWhenResolved();
public StrictlyOrderedAwaitTask WhenResolved => _capTask;
public T? GetResolvedCapability<T>() where T: class
{

View File

@ -17,11 +17,11 @@ namespace Capnp.Rpc
return proxy;
}
readonly Task<Proxy> _whenResolvedProxy;
readonly StrictlyOrderedAwaitTask<Proxy> _whenResolvedProxy;
public LocalAnswerCapability(Task<Proxy> proxyTask)
{
_whenResolvedProxy = proxyTask;
_whenResolvedProxy = proxyTask.EnforceAwaitOrder();
}
public LocalAnswerCapability(StrictlyOrderedAwaitTask<DeserializerState> answer, MemberAccessPath access):
@ -30,9 +30,9 @@ namespace Capnp.Rpc
}
public Task WhenResolved => _whenResolvedProxy;
public StrictlyOrderedAwaitTask WhenResolved => _whenResolvedProxy;
public T? GetResolvedCapability<T>() where T : class => _whenResolvedProxy.GetResolvedCapability<T>();
public T? GetResolvedCapability<T>() where T : class => _whenResolvedProxy.WrappedTask.GetResolvedCapability<T>();
internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{

View File

@ -10,7 +10,7 @@ namespace Capnp.Rpc
readonly uint _remoteId;
readonly object _reentrancyBlocker = new object();
readonly TaskCompletionSource<ConsumedCapability> _resolvedCap = new TaskCompletionSource<ConsumedCapability>();
readonly Task<Proxy> _whenResolvedProxy;
readonly StrictlyOrderedAwaitTask<Proxy> _whenResolvedProxy;
bool _released;
public PromisedCapability(IRpcEndpoint ep, uint remoteId): base(ep)
@ -18,11 +18,11 @@ namespace Capnp.Rpc
_remoteId = remoteId;
async Task<Proxy> AwaitProxy() => new Proxy(await _resolvedCap.Task);
_whenResolvedProxy = AwaitProxy();
_whenResolvedProxy = AwaitProxy().EnforceAwaitOrder();
}
public override Task WhenResolved => _resolvedCap.Task;
public override T? GetResolvedCapability<T>() where T: class => _whenResolvedProxy.GetResolvedCapability<T>();
public override StrictlyOrderedAwaitTask WhenResolved => _whenResolvedProxy;
public override T? GetResolvedCapability<T>() where T: class => _whenResolvedProxy.WrappedTask.GetResolvedCapability<T>();
internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{

View File

@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Capnp.Util;
using Microsoft.Extensions.Logging;
using System;
using System.Diagnostics;
using System.Threading;
@ -31,12 +32,12 @@ namespace Capnp.Rpc
/// <summary>
/// Completes when the capability gets resolved.
/// </summary>
public Task WhenResolved
public StrictlyOrderedAwaitTask WhenResolved
{
get
{
return ConsumedCap is IResolvingCapability resolving ?
resolving.WhenResolved : Task.CompletedTask;
resolving.WhenResolved : Task.CompletedTask.EnforceAwaitOrder();
}
}

View File

@ -17,13 +17,13 @@ namespace Capnp.Rpc
readonly PendingQuestion _question;
readonly MemberAccessPath _access;
readonly Task<Proxy> _whenResolvedProxy;
readonly StrictlyOrderedAwaitTask<Proxy> _whenResolvedProxy;
public RemoteAnswerCapability(PendingQuestion question, MemberAccessPath access, Task<Proxy> proxyTask) : base(question.RpcEndpoint)
{
_question = question ?? throw new ArgumentNullException(nameof(question));
_access = access ?? throw new ArgumentNullException(nameof(access));
_whenResolvedProxy = proxyTask ?? throw new ArgumentNullException(nameof(proxyTask));
_whenResolvedProxy = (proxyTask ?? throw new ArgumentNullException(nameof(proxyTask))).EnforceAwaitOrder();
}
static async Task<Proxy> TransferOwnershipToDummyProxy(PendingQuestion question, MemberAccessPath access)
@ -85,9 +85,9 @@ namespace Capnp.Rpc
}
}
public override Task WhenResolved => _whenResolvedProxy;
public override StrictlyOrderedAwaitTask WhenResolved => _whenResolvedProxy;
public override T? GetResolvedCapability<T>() where T: class => _whenResolvedProxy.GetResolvedCapability<T>();
public override T? GetResolvedCapability<T>() where T: class => _whenResolvedProxy.WrappedTask.GetResolvedCapability<T>();
protected override void GetMessageTarget(MessageTarget.WRITER wr)
{

View File

@ -17,7 +17,7 @@ namespace Capnp.Rpc
ILogger Logger { get; } = Logging.CreateLogger<RemoteResolvingCapability>();
#endif
public abstract Task WhenResolved { get; }
public abstract StrictlyOrderedAwaitTask WhenResolved { get; }
public abstract T? GetResolvedCapability<T>() where T : class;
protected RemoteResolvingCapability(IRpcEndpoint ep) : base(ep)