bug fixes, stability improvements

This commit is contained in:
Christian Köllner 2020-03-22 00:12:50 +01:00
parent b6c84c3c6c
commit ec0df4872f
23 changed files with 402 additions and 496 deletions

View File

@ -767,5 +767,29 @@ namespace Capnp.Net.Runtime.Tests
CollectionAssert.AreEqual(expected[i], voids[i].ToArray());
}
}
[TestMethod]
public void ListOfEmpty()
{
var expected = new TestEnum[] { TestEnum.bar, TestEnum.baz, TestEnum.corge };
var b = MessageBuilder.Create();
var loes = b.CreateObject<ListOfEmptySerializer>();
loes.Init(12345678);
DeserializerState d = loes;
var ld = d.RequireList();
Assert.AreEqual(ListKind.ListOfEmpty, ld.Kind);
if (!(ld is ListOfEmptyDeserializer loed))
{
Assert.Fail("List did not deserialize back to ListOfEmptyDeserializer");
return;
}
Assert.AreEqual(12345678, loed.Count);
Assert.ThrowsException<IndexOutOfRangeException>(() => { var _ = loed[-1]; });
Assert.ThrowsException<IndexOutOfRangeException>(() => { var _ = loed[12345678]; });
_ = loed[12345677];
var kind = loed.Cast(_ => _.Kind).Take(1).Single();
Assert.AreEqual(ObjectKind.Nil, kind);
}
}
}

View File

@ -92,5 +92,11 @@ namespace Capnp.Net.Runtime.Tests
{
NewDtbdctTestbed().RunTest(Testsuite.Basic);
}
[TestMethod]
public void BootstrapReuse()
{
NewDtbdctTestbed().RunTest(Testsuite.BootstrapReuse);
}
}
}

View File

@ -12,7 +12,7 @@ using System.Threading.Tasks;
namespace Capnp.Net.Runtime.Tests
{
[TestClass]
public class LocalRpc
public class LocalRpc: TestBase
{
[TestMethod]
public void DeferredLocalAnswer()
@ -29,5 +29,89 @@ namespace Capnp.Net.Runtime.Tests
Assert.AreEqual("bar", foo.Result);
}
}
[TestMethod]
public void Embargo()
{
NewLocalTestbed().RunTest(Testsuite.Embargo);
}
[TestMethod]
public void EmbargoError()
{
NewLocalTestbed().RunTest(Testsuite.EmbargoError);
}
[TestMethod]
public void EmbargoNull()
{
NewLocalTestbed().RunTest(Testsuite.EmbargoNull);
}
[TestMethod]
public void CallBrokenPromise()
{
NewLocalTestbed().RunTest(Testsuite.CallBrokenPromise);
}
[TestMethod]
public void TailCall()
{
NewLocalTestbed().RunTest(Testsuite.TailCall);
}
[TestMethod]
public void SendTwice()
{
NewLocalTestbed().RunTest(Testsuite.SendTwice);
}
[TestMethod]
public void Cancel()
{
NewLocalTestbed().RunTest(Testsuite.Cancel);
}
[TestMethod]
public void RetainAndRelease()
{
NewLocalTestbed().RunTest(Testsuite.RetainAndRelease);
}
[TestMethod]
public void PromiseResolve()
{
NewLocalTestbed().RunTest(Testsuite.PromiseResolve);
}
[TestMethod]
public void Cancelation()
{
NewLocalTestbed().RunTest(Testsuite.Cancelation);
}
[TestMethod]
public void ReleaseOnCancel()
{
NewLocalTestbed().RunTest(Testsuite.ReleaseOnCancel);
}
[TestMethod]
public void Release()
{
NewLocalTestbed().RunTest(Testsuite.Release);
}
[TestMethod]
public void Pipeline()
{
NewLocalTestbed().RunTest(Testsuite.Pipeline);
}
[TestMethod]
public void Basic()
{
NewLocalTestbed().RunTest(Testsuite.Basic);
}
}
}

View File

@ -420,9 +420,12 @@ namespace Capnp.Net.Runtime.Tests.GenImpls
public void Dispose()
{
_tcs?.SetResult(0);
_tcs?.TrySetResult(0);
IsDisposed = true;
}
public bool IsDisposed { get; private set; }
public virtual Task<string> Foo(uint i, bool j, CancellationToken cancellationToken)
{
Interlocked.Increment(ref _counters.CallCount);
@ -512,20 +515,26 @@ namespace Capnp.Net.Runtime.Tests.GenImpls
public async Task<(string, TestPipeline.AnyBox)> GetAnyCap(uint n, BareProxy inCap, CancellationToken cancellationToken_)
{
Interlocked.Increment(ref _counters.CallCount);
Assert.AreEqual(234u, n);
var s = await inCap.Cast<ITestInterface>(true).Foo(123, true, cancellationToken_);
Assert.AreEqual("foo", s);
return ("bar", new TestPipeline.AnyBox() { Cap = BareProxy.FromImpl(new TestExtendsImpl(_counters)) });
using (inCap)
{
Interlocked.Increment(ref _counters.CallCount);
Assert.AreEqual(234u, n);
var s = await inCap.Cast<ITestInterface>(true).Foo(123, true, cancellationToken_);
Assert.AreEqual("foo", s);
return ("bar", new TestPipeline.AnyBox() { Cap = BareProxy.FromImpl(new TestExtendsImpl(_counters)) });
}
}
public async Task<(string, TestPipeline.Box)> GetCap(uint n, ITestInterface inCap, CancellationToken cancellationToken_)
{
Interlocked.Increment(ref _counters.CallCount);
Assert.AreEqual(234u, n);
var s = await inCap.Foo(123, true, cancellationToken_);
Assert.AreEqual("foo", s);
return ("bar", new TestPipeline.Box() { Cap = new TestExtendsImpl(_counters) });
using (inCap)
{
Interlocked.Increment(ref _counters.CallCount);
Assert.AreEqual(234u, n);
var s = await inCap.Foo(123, true, cancellationToken_);
Assert.AreEqual("foo", s);
return ("bar", new TestPipeline.Box() { Cap = new TestExtendsImpl(_counters) });
}
}
public Task TestPointers(ITestInterface cap, object obj, IReadOnlyList<ITestInterface> list, CancellationToken cancellationToken_)
@ -558,8 +567,11 @@ namespace Capnp.Net.Runtime.Tests.GenImpls
public async Task<(string, TestPipeline.Box)> GetCap(uint n, ITestInterface inCap, CancellationToken cancellationToken_ = default)
{
await _deblock;
return ("hello", new TestPipeline.Box() { Cap = _timpl2 });
using (inCap)
{
await _deblock;
return ("hello", new TestPipeline.Box() { Cap = _timpl2 });
}
}
public Task TestPointers(ITestInterface cap, object obj, IReadOnlyList<ITestInterface> list, CancellationToken cancellationToken_ = default)
@ -661,10 +673,15 @@ namespace Capnp.Net.Runtime.Tests.GenImpls
public void Dispose()
{
IsDisposed = true;
}
public bool IsDisposed { get; private set; }
public Task<TestTailCallee.TailResult> Foo(int i, string t, CancellationToken cancellationToken_)
{
Assert.IsFalse(IsDisposed);
Interlocked.Increment(ref _counters.CallCount);
var result = new TestTailCallee.TailResult()
@ -710,9 +727,12 @@ namespace Capnp.Net.Runtime.Tests.GenImpls
public async Task<string> CallFooWhenResolved(ITestInterface cap, CancellationToken cancellationToken_)
{
Interlocked.Increment(ref _counters.CallCount);
await ((Proxy)cap).WhenResolved;
string s = await cap.Foo(123, true, cancellationToken_);
Assert.AreEqual("foo", s);
using (cap)
{
await ((Proxy)cap).WhenResolved;
string s = await cap.Foo(123, true, cancellationToken_);
Assert.AreEqual("foo", s);
}
return "bar";
}

View File

@ -17501,21 +17501,21 @@ namespace Capnproto_test.Capnp.Test
public static Capnproto_test.Capnp.Test.ITestInterface OutBox_Cap(this Task<(string, Capnproto_test.Capnp.Test.TestPipeline.Box)> task)
{
async Task<IDisposable> AwaitProxy() => (await task).Item2?.Cap;
return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestInterface>(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap, AwaitProxy()));
return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestInterface>(Impatient.Access(task, Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap, AwaitProxy()));
}
static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap = new MemberAccessPath(1U, 0U);
public static BareProxy OutBox_Cap(this Task<(string, Capnproto_test.Capnp.Test.TestPipeline.AnyBox)> task)
{
async Task<IDisposable> AwaitProxy() => (await task).Item2?.Cap;
return (BareProxy)CapabilityReflection.CreateProxy<BareProxy>(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap, AwaitProxy()));
return (BareProxy)CapabilityReflection.CreateProxy<BareProxy>(Impatient.Access(task, Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap, AwaitProxy()));
}
static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestTailCallee_foo_C = new MemberAccessPath(1U);
public static Capnproto_test.Capnp.Test.ITestCallOrder C(this Task<Capnproto_test.Capnp.Test.TestTailCallee.TailResult> task)
{
async Task<IDisposable> AwaitProxy() => (await task).C;
return (Capnproto_test.Capnp.Test.ITestCallOrder)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestCallOrder>(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestTailCallee_foo_C, AwaitProxy()));
return (Capnproto_test.Capnp.Test.ITestCallOrder)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestCallOrder>(Impatient.Access(task, Path_capnproto_test_capnp_test_TestTailCallee_foo_C, AwaitProxy()));
}
}
}

View File

@ -2,6 +2,7 @@
using Capnp.Net.Runtime.Tests.GenImpls;
using Capnp.Rpc;
using Capnproto_test.Capnp.Test;
using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
@ -63,21 +64,25 @@ namespace Capnp.Net.Runtime.Tests
try
{
_currentProcess.StandardError.ReadToEndAsync().ContinueWith(t => Console.Error.WriteLine(t.Result));
var firstLine = _currentProcess.StandardOutput.ReadLineAsync();
Assert.IsTrue(firstLine.Wait(MediumNonDbgTimeout), "Problem after launching test process");
Assert.IsNotNull(firstLine.Result, "Problem after launching test process");
Assert.IsTrue(firstLine.Result.StartsWith("Listening") || firstLine.Result.StartsWith("Connecting"),
"Problem after launching test process");
try
{
_currentProcess.StandardError.ReadToEndAsync().ContinueWith(t => Console.Error.WriteLine(t.Result));
var firstLine = _currentProcess.StandardOutput.ReadLineAsync();
Assert.IsTrue(firstLine.Wait(MediumNonDbgTimeout), "Problem after launching test process");
Assert.IsNotNull(firstLine.Result, "Problem after launching test process");
Assert.IsTrue(firstLine.Result.StartsWith("Listening") || firstLine.Result.StartsWith("Connecting"),
"Problem after launching test process");
}
catch (AssertFailedException exception)
{
Logger.LogError(exception.Message);
return false;
}
test(_currentProcess.StandardOutput);
return true;
}
catch (AssertFailedException)
{
return false;
}
finally
{
try
@ -220,6 +225,7 @@ namespace Capnp.Net.Runtime.Tests
{
AssertOutput(stdout, "Pipelining test start");
AssertOutput(stdout, "foo 123 1");
AssertOutput(stdout, "~");
AssertOutput(stdout, "Pipelining test end");
Assert.AreEqual(3, counters.CallCount);
});
@ -342,17 +348,19 @@ namespace Capnp.Net.Runtime.Tests
for (int i = 0; i < iterationCount; i++)
{
var task = main.GetHandle(default);
taskList.Add(task.ContinueWith(t =>
async Task TerminateAnswer()
{
try
{
t.Result.Dispose();
using (var proxy = await task)
{
}
}
catch (AggregateException ex) when (ex.InnerException is TaskCanceledException)
catch (TaskCanceledException)
{
}
}));
Impatient.GetAnswer(task).Dispose();
}
taskList.Add(TerminateAnswer());
}
// Ensure that all answers return (probably in canceled state)
@ -558,6 +566,7 @@ namespace Capnp.Net.Runtime.Tests
AssertOutput(stdout, "PromiseResolve test start");
AssertOutput(stdout, "foo 123 1");
AssertOutput(stdout, "foo 123 1");
AssertOutput(stdout, "~");
AssertOutput(stdout, "PromiseResolve test end");
Assert.AreEqual(3, counters.CallCount);
});

View File

@ -139,5 +139,32 @@ namespace Capnp.Net.Runtime.Tests
{
NewLocalhostTcpTestbed().RunTest(Testsuite.CallBrokenPromise);
}
[TestMethod]
public void BootstrapReuse()
{
(var server, var client) = SetupClientServerPair();
var counters = new Counters();
var impl = new TestInterfaceImpl(counters);
using (server)
using (client)
{
client.WhenConnected.Wait();
server.Main = impl;
for (int i = 0; i < 10; i++)
{
using (var main = client.GetMain<ITestMoreStuff>())
{
((Proxy)main).WhenResolved.Wait(MediumNonDbgTimeout);
}
Assert.IsFalse(impl.IsDisposed);
}
}
Assert.IsTrue(impl.IsDisposed);
}
}
}

View File

@ -60,8 +60,8 @@ namespace Capnp.Net.Runtime.Tests
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
var resolving = main as IResolvingCapability;
testbed.MustComplete(resolving.WhenResolved);
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
var cap = new TestCallOrderImpl();
cap.CountToDispose = 6;
@ -70,7 +70,7 @@ namespace Capnp.Net.Runtime.Tests
var echo = main.Echo(cap, default);
testbed.MustComplete(Task.CompletedTask);
using (var pipeline = echo.Eager())
using (var pipeline = echo.Eager(true))
{
var call0 = pipeline.GetCallSequence(0, default);
var call1 = pipeline.GetCallSequence(1, default);
@ -120,15 +120,15 @@ namespace Capnp.Net.Runtime.Tests
var impl = new TestMoreStuffImpl(counters);
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
var resolving = main as IResolvingCapability;
testbed.MustComplete(resolving.WhenResolved);
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
var cap = new TaskCompletionSource<ITestCallOrder>();
var earlyCall = main.GetCallSequence(0, default);
var echo = main.Echo(cap.Task.Eager(true), default);
using (var pipeline = echo.Eager())
using (var pipeline = echo.Eager(true))
{
var call0 = pipeline.GetCallSequence(0, default);
var call1 = pipeline.GetCallSequence(1, default);
@ -161,20 +161,21 @@ namespace Capnp.Net.Runtime.Tests
var impl = new TestMoreStuffImpl(counters);
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
var resolving = main as IResolvingCapability;
testbed.MustComplete(resolving.WhenResolved);
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
var promise = main.GetNull(default);
var cap = promise.Eager();
using (var cap = promise.Eager(true))
{
var call0 = cap.GetCallSequence(0, default);
var call0 = cap.GetCallSequence(0, default);
testbed.MustComplete(promise);
testbed.MustComplete(promise);
var call1 = cap.GetCallSequence(1, default);
var call1 = cap.GetCallSequence(1, default);
testbed.ExpectPromiseThrows(call0, call1);
testbed.ExpectPromiseThrows(call0, call1);
}
// Verify that we're still connected (there were no protocol errors).
testbed.MustComplete(main.GetCallSequence(1, default));
@ -184,11 +185,11 @@ namespace Capnp.Net.Runtime.Tests
public static void CallBrokenPromise(ITestbed testbed)
{
var counters = new Counters();
var impl = new TestMoreStuffImpl(counters);
using (var impl = new TestMoreStuffImpl(counters))
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
var resolving = main as IResolvingCapability;
testbed.MustComplete(resolving.WhenResolved);
if (main is IResolvingCapability resolving)
testbed.MustComplete(resolving.WhenResolved);
var tcs = new TaskCompletionSource<ITestInterface>();
@ -231,6 +232,7 @@ namespace Capnp.Net.Runtime.Tests
testbed.MustComplete(dependentCall0, dependentCall1, dependentCall2);
Assert.IsTrue(callee.IsDisposed);
Assert.AreEqual(1, counters.CallCount);
Assert.AreEqual(1, calleeCallCount.CallCount);
}
@ -313,7 +315,7 @@ namespace Capnp.Net.Runtime.Tests
var destructionTask = destructionPromise.Task;
var counters = new Counters();
var impl = new TestMoreStuffImpl(counters);
using (var impl = new TestMoreStuffImpl(counters))
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
var holdTask = main.Hold(new TestInterfaceImpl(new Counters(), destructionPromise), default);
@ -366,12 +368,6 @@ namespace Capnp.Net.Runtime.Tests
// Can't be destroyed, we haven't released it.
Assert.IsFalse(destructionTask.IsCompleted);
}
// In deviation from original test, we have null the held capability on the main interface.
// This is because the main interface is the bootstrap capability and, as such, won't be disposed
// after disconnect.
var holdNullTask = main.Hold(null, default);
testbed.MustComplete(holdNullTask);
}
testbed.MustComplete(destructionTask);
@ -384,26 +380,27 @@ namespace Capnp.Net.Runtime.Tests
using (var main = testbed.ConnectMain<ITestMoreStuff>(impl))
{
var tcs = new TaskCompletionSource<ITestInterface>();
var eager = tcs.Task.Eager(true);
using (var eager = tcs.Task.Eager(true))
{
var request = main.CallFoo(Proxy.Share(eager), default);
var request2 = main.CallFooWhenResolved(eager, default);
var request = main.CallFoo(eager, default);
var request2 = main.CallFooWhenResolved(eager, default);
var gcs = main.GetCallSequence(0, default);
testbed.MustComplete(gcs);
Assert.AreEqual(2u, gcs.Result);
Assert.AreEqual(3, counters.CallCount);
var gcs = main.GetCallSequence(0, default);
testbed.MustComplete(gcs);
Assert.AreEqual(2u, gcs.Result);
Assert.AreEqual(3, counters.CallCount);
var chainedCallCount = new Counters();
var tiimpl = new TestInterfaceImpl(chainedCallCount);
tcs.SetResult(tiimpl);
var chainedCallCount = new Counters();
var tiimpl = new TestInterfaceImpl(chainedCallCount);
tcs.SetResult(tiimpl);
testbed.MustComplete(request, request2);
testbed.MustComplete(request, request2);
Assert.AreEqual("bar", request.Result);
Assert.AreEqual("bar", request2.Result);
Assert.AreEqual(3, counters.CallCount);
Assert.AreEqual(2, chainedCallCount.CallCount);
Assert.AreEqual("bar", request.Result);
Assert.AreEqual("bar", request2.Result);
Assert.AreEqual(3, counters.CallCount);
Assert.AreEqual(2, chainedCallCount.CallCount);
}
}
}
@ -488,15 +485,18 @@ namespace Capnp.Net.Runtime.Tests
using (var outBox = request.OutBox_Cap())
{
var pipelineRequest = outBox.Foo(321, false, default);
var pipelineRequest2 = ((Proxy)outBox).Cast<ITestExtends>(false).Grault(default);
using (var testx = ((Proxy)outBox).Cast<ITestExtends>(false))
{
var pipelineRequest2 = testx.Grault(default);
testbed.MustComplete(pipelineRequest, pipelineRequest2);
testbed.MustComplete(pipelineRequest, pipelineRequest2);
Assert.AreEqual("bar", pipelineRequest.Result);
Common.CheckTestMessage(pipelineRequest2.Result);
Assert.AreEqual("bar", pipelineRequest.Result);
Common.CheckTestMessage(pipelineRequest2.Result);
Assert.AreEqual(3, counters.CallCount);
Assert.AreEqual(1, chainedCallCount.CallCount);
Assert.AreEqual(3, counters.CallCount);
Assert.AreEqual(1, chainedCallCount.CallCount);
}
}
}
}
@ -519,5 +519,18 @@ namespace Capnp.Net.Runtime.Tests
Assert.AreEqual(2, counters.CallCount);
}
}
public static void BootstrapReuse(ITestbed testbed)
{
var counters = new Counters();
var impl = new TestInterfaceImpl(counters);
for (int i = 0; i < 10; i++)
{
using (var main = testbed.ConnectMain<ITestInterface>(impl))
{
}
Assert.IsFalse(impl.IsDisposed);
}
}
}
}

View File

@ -147,6 +147,35 @@ namespace Capnp.Net.Runtime.Tests
public int Channel2SendCount => _channel2.FrameCounter;
}
protected class LocalTestbed : ITestbed, ITestController
{
long ITestbed.ClientSendCount => 0;
public void RunTest(Action<ITestbed> action)
{
action(this);
}
T ITestbed.ConnectMain<T>(T main)
{
return main;
}
void ITestbed.FlushCommunication()
{
}
void ITestbed.MustComplete(params Task[] tasks)
{
Assert.IsTrue(Task.WhenAll(tasks).IsCompleted);
}
void ITestbed.MustNotComplete(params Task[] tasks)
{
Assert.IsFalse(Task.WhenAny(tasks).IsCompleted);
}
}
protected class DtbdctTestbed : ITestbed, ITestController
{
readonly DecisionTree _decisionTree = new DecisionTree();
@ -154,7 +183,20 @@ namespace Capnp.Net.Runtime.Tests
public void RunTest(Action<ITestbed> action)
{
_decisionTree.Iterate(() => action(this));
_decisionTree.Iterate(() => {
action(this);
_enginePair.FlushChannels(() => false);
Assert.AreEqual(0, _enginePair.Endpoint1.ExportedCapabilityCount);
Assert.AreEqual(0, _enginePair.Endpoint1.ImportedCapabilityCount);
Assert.AreEqual(0, _enginePair.Endpoint2.ExportedCapabilityCount);
Assert.AreEqual(0, _enginePair.Endpoint2.ImportedCapabilityCount);
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
});
}
T ITestbed.ConnectMain<T>(T main)
@ -290,6 +332,8 @@ namespace Capnp.Net.Runtime.Tests
protected static DtbdctTestbed NewDtbdctTestbed() => new DtbdctTestbed();
protected static LocalhostTcpTestbed NewLocalhostTcpTestbed() => new LocalhostTcpTestbed();
protected static LocalTestbed NewLocalTestbed() => new LocalTestbed();
[TestInitialize]
public void InitConsoleLogging()
{

View File

@ -25,5 +25,10 @@ namespace Capnp.Rpc
[System.Runtime.CompilerServices.CallerMemberName] string methodName = "",
[System.Runtime.CompilerServices.CallerFilePath] string filePath = "",
[System.Runtime.CompilerServices.CallerLineNumber] int lineNumber = 0);
#if DebugFinalizers
internal Proxy? OwningProxy { get; set; }
internal ConsumedCapability? ResolvingCap { get; set; }
#endif
}
}

View File

@ -92,35 +92,11 @@ namespace Capnp.Rpc
return answer;
}
static async Task<Proxy> AwaitProxy<T>(Task<T> task) where T: class
public static ConsumedCapability? Access(Task task, MemberAccessPath access, Task<IDisposable?> proxyTask)
{
T item;
try
{
item = await task;
}
catch (TaskCanceledException exception)
{
return new Proxy(LazyCapability.CreateCanceledCap(exception.CancellationToken));
}
catch (System.Exception exception)
{
return new Proxy(LazyCapability.CreateBrokenCap(exception.Message));
}
switch (item)
{
case Proxy proxy:
return proxy;
case null:
return CapabilityReflection.CreateProxy<T>(null);
}
var skel = Skeleton.GetOrCreateSkeleton(item!, false);
var localCap = LocalCapability.Create(skel);
return CapabilityReflection.CreateProxy<T>(localCap);
var answer = TryGetAnswer(task);
if (answer != null) return answer.Access(access, proxyTask);
return new LazyCapability(proxyTask.AsProxyTask());
}
/// <summary>
@ -138,9 +114,9 @@ namespace Capnp.Rpc
/// quality as capability interface.</exception>
[Obsolete("Call Eager<TInterface>(task, true) instead")]
public static TInterface PseudoEager<TInterface>(this Task<TInterface> task)
where TInterface : class
where TInterface : class, IDisposable
{
var lazyCap = new LazyCapability(AwaitProxy(task));
var lazyCap = new LazyCapability(task.AsProxyTask());
return (CapabilityReflection.CreateProxy<TInterface>(lazyCap) as TInterface)!;
}
@ -177,7 +153,7 @@ namespace Capnp.Rpc
throw new ArgumentException("The task was not returned from a remote method invocation. See documentation for details.");
}
var lazyCap = new LazyCapability(AwaitProxy(task));
var lazyCap = new LazyCapability(task.AsProxyTask());
return (CapabilityReflection.CreateProxy<TInterface>(lazyCap) as TInterface)!;
}
else

View File

@ -27,7 +27,7 @@ namespace Capnp.Rpc
public ConsumedCapability Access(MemberAccessPath access)
{
return new LocalAnswerCapabilityDeprecated(WhenReturned, access);
return new LocalAnswerCapability(WhenReturned, access);
}
public ConsumedCapability Access(MemberAccessPath _, Task<IDisposable?> task)

View File

@ -7,6 +7,15 @@ namespace Capnp.Rpc
class LocalAnswerCapability : RefCountingCapability, IResolvingCapability
{
static async Task<Proxy> TransferOwnershipToDummyProxy(Task<DeserializerState> answer, MemberAccessPath access)
{
var result = await answer;
var cap = access.Eval(result);
var proxy = new Proxy(cap);
cap?.Release(false);
return proxy;
}
readonly Task<Proxy> _whenResolvedProxy;
public LocalAnswerCapability(Task<Proxy> proxyTask)
@ -17,6 +26,12 @@ namespace Capnp.Rpc
WhenResolved = AwaitResolved();
}
public LocalAnswerCapability(Task<DeserializerState> answer, MemberAccessPath access):
this(TransferOwnershipToDummyProxy(answer, access))
{
}
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
{
boundEndpoint = null;

View File

@ -1,84 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
class LocalAnswerCapabilityDeprecated : RefCountingCapability, IResolvingCapability
{
readonly Task<DeserializerState> _answer;
readonly MemberAccessPath _access;
public LocalAnswerCapabilityDeprecated(Task<DeserializerState> answer, MemberAccessPath access)
{
_answer = answer;
_access = access;
async Task<ConsumedCapability?> AwaitResolved() => access.Eval(await _answer);
WhenResolved = AwaitResolved();
}
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
{
boundEndpoint = null;
}
internal override void Unfreeze()
{
}
public Task<ConsumedCapability?> WhenResolved { get; private set; }
internal override Action? Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{
if (_answer.IsCompleted)
{
DeserializerState result;
try
{
result = _answer.Result;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
using var proxy = new Proxy(_access.Eval(result));
proxy.Export(endpoint, writer);
return null;
}
else
{
return this.ExportAsSenderPromise(endpoint, writer);
}
}
async Task<DeserializerState> CallImpl(ulong interfaceId, ushort methodId, DynamicSerializerState args, CancellationToken cancellationToken)
{
var cap = await WhenResolved;
cancellationToken.ThrowIfCancellationRequested();
if (cap == null)
throw new RpcException("Broken capability");
using var proxy = new Proxy(cap);
var call = proxy.Call(interfaceId, methodId, args, default);
var whenReturned = call.WhenReturned;
using var registration = cancellationToken.Register(() => call.Dispose());
return await whenReturned;
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)
{
var cts = new CancellationTokenSource();
return new LocalAnswer(cts, CallImpl(interfaceId, methodId, args, cts.Token));
}
protected override void ReleaseRemotely()
{
}
}
}

View File

@ -126,17 +126,17 @@ namespace Capnp.Rpc
lock (ReentrancyBlocker)
{
SetReturned();
}
if (StateFlags.HasFlag(State.TailCall))
if (StateFlags.HasFlag(State.TailCall))
{
_tcs.TrySetException(new RpcException(ReturnDespiteTailCallMessage));
}
else
{
if (!_tcs.TrySetResult(results))
{
_tcs.TrySetException(new RpcException(ReturnDespiteTailCallMessage));
}
else
{
if (!_tcs.TrySetResult(results))
{
ReleaseOutCaps(results);
}
ReleaseOutCaps(results);
}
}
}
@ -146,15 +146,15 @@ namespace Capnp.Rpc
lock (ReentrancyBlocker)
{
SetReturned();
}
if (!StateFlags.HasFlag(State.TailCall))
{
_tcs.TrySetException(new RpcException("Peer sent the results of this questions somewhere else. This was not expected and is a protocol error."));
}
else
{
_tcs.TrySetResult(default);
}
if (!StateFlags.HasFlag(State.TailCall))
{
_tcs.TrySetException(new RpcException("Peer sent the results of this questions somewhere else. This was not expected and is a protocol error."));
}
else
{
_tcs.TrySetResult(default);
}
}
@ -163,9 +163,9 @@ namespace Capnp.Rpc
lock (ReentrancyBlocker)
{
SetReturned();
_tcs.TrySetException(new RpcException(exception.Reason));
}
_tcs.TrySetException(new RpcException(exception.Reason));
}
internal void OnException(System.Exception exception)
@ -173,9 +173,9 @@ namespace Capnp.Rpc
lock (ReentrancyBlocker)
{
SetReturned();
_tcs.TrySetException(exception);
}
_tcs.TrySetException(exception);
}
internal void OnCanceled()
@ -183,9 +183,9 @@ namespace Capnp.Rpc
lock (ReentrancyBlocker)
{
SetReturned();
_tcs.TrySetCanceled();
}
_tcs.TrySetCanceled();
}
void DeleteMyQuestion()
@ -234,7 +234,6 @@ namespace Capnp.Rpc
/// <param name="access">Access path</param>
/// <returns>Low-level capability</returns>
/// <exception cref="DeserializationException">The referenced member does not exist or does not resolve to a capability pointer.</exception>
[Obsolete("Please re-generate. Replaced by Access(MemberAccessPath access, Task<IDisposable> task)")]
public ConsumedCapability? Access(MemberAccessPath access)
{
lock (ReentrancyBlocker)

View File

@ -203,6 +203,9 @@ namespace Capnp.Rpc
lock (_reentrancyBlocker)
{
#if DebugFinalizers
resolvedCap.ResolvingCap = this;
#endif
_resolvedCap.SetResult(resolvedCap);
if (_pendingCallsOnPromise == 0)
@ -247,11 +250,10 @@ namespace Capnp.Rpc
{
if (!_released)
{
_released = true;
_ep.ReleaseImport(_remoteId);
}
_ep.ReleaseImport(_remoteId);
try { using var _ = await _whenResolvedProxy; }
catch { }
}

View File

@ -120,6 +120,11 @@ namespace Capnp.Rpc
ConsumedCap = cap;
cap.AddRef();
#if DebugFinalizers
if (ConsumedCap != null)
ConsumedCap.OwningProxy = this;
#endif
}
internal Skeleton? GetProvider()

View File

@ -1,253 +0,0 @@
using System;
using System.Threading.Tasks;
namespace Capnp.Rpc
{
#if false
class RemoteAnswerCapabilityDeprecated : RemoteResolvingCapability
{
// Set DebugEmbargos to true to get logging output for calls. RPC calls are expected to
// be on the critical path, hence very relevant for performance. We just can't afford
// additional stuff on this path. Even if the logger filters the outputs away, there is
// overhead for creating the Logger object, calling the Logger methods and deciding to
// filter the output. This justifies the precompiler switch.
#if DebugEmbargos
ILogger Logger { get; } = Logging.CreateLogger<RemoteAnswerCapability>();
#endif
readonly PendingQuestion _question;
readonly MemberAccessPath _access;
ConsumedCapability? _resolvedCap;
public RemoteAnswerCapabilityDeprecated(PendingQuestion question, MemberAccessPath access): base(question.RpcEndpoint)
{
_question = question ?? throw new ArgumentNullException(nameof(question));
_access = access ?? throw new ArgumentNullException(nameof(access));
async Task<ConsumedCapability?> AwaitWhenResolved()
{
await _question.WhenReturned;
if (_question.IsTailCall)
throw new InvalidOperationException("Question is a tail call, so won't resolve back.");
return ResolvedCap!;
}
WhenResolved = AwaitWhenResolved();
}
async void ReAllowFinishWhenDone(Task task)
{
try
{
++_pendingCallsOnPromise;
await task;
}
catch
{
}
finally
{
lock (_question.ReentrancyBlocker)
{
--_pendingCallsOnPromise;
_question.AllowFinish();
}
}
}
protected override ConsumedCapability? ResolvedCap
{
get
{
lock (_question.ReentrancyBlocker)
{
if (_resolvedCap == null && !_question.IsTailCall && _question.IsReturned)
{
DeserializerState result;
try
{
result = _question.WhenReturned.Result;
}
catch (AggregateException exception)
{
throw exception.InnerException!;
}
_resolvedCap = _access.Eval(result);
}
return _resolvedCap;
}
}
}
public override Task<ConsumedCapability?> WhenResolved { get; }
protected override void GetMessageTarget(MessageTarget.WRITER wr)
{
wr.which = MessageTarget.WHICH.PromisedAnswer;
wr.PromisedAnswer.QuestionId = _question.QuestionId;
_access.Serialize(wr.PromisedAnswer);
}
internal override IPromisedAnswer DoCall(ulong interfaceId, ushort methodId, DynamicSerializerState args)
{
lock (_question.ReentrancyBlocker)
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned) &&
!_question.StateFlags.HasFlag(PendingQuestion.State.TailCall))
{
if (ResolvedCap == null)
{
throw new RpcException("Answer did not resolve to expected capability");
}
return CallOnResolution(interfaceId, methodId, args);
}
else
{
#if DebugEmbargos
Logger.LogDebug("Call by proxy");
#endif
if (_question.StateFlags.HasFlag(PendingQuestion.State.Disposed))
{
throw new ObjectDisposedException(nameof(PendingQuestion));
}
if (_question.StateFlags.HasFlag(PendingQuestion.State.FinishRequested))
{
throw new InvalidOperationException("Finish request was already sent");
}
_question.DisallowFinish();
++_pendingCallsOnPromise;
var promisedAnswer = base.DoCall(interfaceId, methodId, args);
ReAllowFinishWhenDone(promisedAnswer.WhenReturned);
async void DecrementPendingCallsOnPromiseWhenReturned()
{
try
{
await promisedAnswer.WhenReturned;
}
catch
{
}
finally
{
lock (_question.ReentrancyBlocker)
{
--_pendingCallsOnPromise;
}
}
}
DecrementPendingCallsOnPromiseWhenReturned();
return promisedAnswer;
}
}
}
protected override Call.WRITER SetupMessage(DynamicSerializerState args, ulong interfaceId, ushort methodId)
{
var call = base.SetupMessage(args, interfaceId, methodId);
call.Target.which = MessageTarget.WHICH.PromisedAnswer;
call.Target.PromisedAnswer.QuestionId = _question.QuestionId;
_access.Serialize(call.Target.PromisedAnswer);
return call;
}
internal override void Freeze(out IRpcEndpoint? boundEndpoint)
{
lock (_question.ReentrancyBlocker)
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned) &&
_pendingCallsOnPromise == 0)
{
if (ResolvedCap == null)
{
throw new RpcException("Answer did not resolve to expected capability");
}
ResolvedCap.Freeze(out boundEndpoint);
}
else
{
++_pendingCallsOnPromise;
_question.DisallowFinish();
boundEndpoint = _ep;
}
}
}
internal override void Unfreeze()
{
lock (_question.ReentrancyBlocker)
{
if (_pendingCallsOnPromise > 0)
{
--_pendingCallsOnPromise;
_question.AllowFinish();
}
else
{
ResolvedCap?.Unfreeze();
}
}
}
internal override void Export(IRpcEndpoint endpoint, CapDescriptor.WRITER writer)
{
lock (_question.ReentrancyBlocker)
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.Disposed))
throw new ObjectDisposedException(nameof(PendingQuestion));
if (_question.StateFlags.HasFlag(PendingQuestion.State.Returned))
{
ResolvedCap?.Export(endpoint, writer);
}
else
{
if (_question.StateFlags.HasFlag(PendingQuestion.State.FinishRequested))
throw new InvalidOperationException("Finish request was already sent");
if (endpoint == _ep)
{
writer.which = CapDescriptor.WHICH.ReceiverAnswer;
_access.Serialize(writer.ReceiverAnswer);
writer.ReceiverAnswer.QuestionId = _question.QuestionId;
}
else if (_question.IsTailCall)
{
// FIXME: Resource management! We should prevent finishing this
// cap as long as it is exported. Unfortunately, we cannot determine
// when it gets removed from the export table.
var vine = Vine.Create(this);
uint id = endpoint.AllocateExport(vine, out bool first);
writer.which = CapDescriptor.WHICH.SenderHosted;
writer.SenderHosted = id;
}
else
{
this.ExportAsSenderPromise(endpoint, writer);
}
}
}
}
protected async override void ReleaseRemotely()
{
try { (await WhenResolved)?.Release(false); }
catch { }
}
}
#endif
}

View File

@ -34,9 +34,23 @@ namespace Capnp.Rpc
return null;
}
public static async Task<Proxy> AsProxyTask(this Task<IDisposable?> task)
public static async Task<Proxy> AsProxyTask<T>(this Task<T> task)
where T: IDisposable?
{
var obj = await task;
IDisposable? obj;
try
{
obj = await task;
}
catch (TaskCanceledException exception)
{
return new Proxy(LazyCapability.CreateCanceledCap(exception.CancellationToken));
}
catch (System.Exception exception)
{
return new Proxy(LazyCapability.CreateBrokenCap(exception.Message));
}
switch (obj)
{
case Proxy proxy: return proxy;

View File

@ -132,6 +132,9 @@ namespace Capnp.Rpc
public long SendCount => Interlocked.Read(ref _sendCount);
public long RecvCount => Interlocked.Read(ref _recvCount);
public int ImportedCapabilityCount => _importTable.Count;
public int ExportedCapabilityCount => _exportTable.Count;
void Tx(WireFrame frame)
{
try

View File

@ -263,6 +263,8 @@ namespace Capnp.Rpc
SafeJoin(connection.PumpRunner);
}
_rpcEngine.BootstrapCap = null;
GC.SuppressFinalize(this);
}

View File

@ -16717,21 +16717,21 @@ namespace Capnproto_test.Capnp.Test
public static Capnproto_test.Capnp.Test.ITestInterface OutBox_Cap(this Task<(string, Capnproto_test.Capnp.Test.TestPipeline.Box)> task)
{
async Task<IDisposable> AwaitProxy() => (await task).Item2?.Cap;
return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestInterface>(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap, AwaitProxy()));
return (Capnproto_test.Capnp.Test.ITestInterface)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestInterface>(Impatient.Access(task, Path_capnproto_test_capnp_test_TestPipeline_getCap_OutBox_Cap, AwaitProxy()));
}
static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap = new MemberAccessPath(1U, 0U);
public static BareProxy OutBox_Cap(this Task<(string, Capnproto_test.Capnp.Test.TestPipeline.AnyBox)> task)
{
async Task<IDisposable> AwaitProxy() => (await task).Item2?.Cap;
return (BareProxy)CapabilityReflection.CreateProxy<BareProxy>(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap, AwaitProxy()));
return (BareProxy)CapabilityReflection.CreateProxy<BareProxy>(Impatient.Access(task, Path_capnproto_test_capnp_test_TestPipeline_getAnyCap_OutBox_Cap, AwaitProxy()));
}
static readonly MemberAccessPath Path_capnproto_test_capnp_test_TestTailCallee_foo_C = new MemberAccessPath(1U);
public static Capnproto_test.Capnp.Test.ITestCallOrder C(this Task<Capnproto_test.Capnp.Test.TestTailCallee.TailResult> task)
{
async Task<IDisposable> AwaitProxy() => (await task).C;
return (Capnproto_test.Capnp.Test.ITestCallOrder)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestCallOrder>(Impatient.GetAnswer(task).Access(Path_capnproto_test_capnp_test_TestTailCallee_foo_C, AwaitProxy()));
return (Capnproto_test.Capnp.Test.ITestCallOrder)CapabilityReflection.CreateProxy<Capnproto_test.Capnp.Test.ITestCallOrder>(Impatient.Access(task, Path_capnproto_test_capnp_test_TestTailCallee_foo_C, AwaitProxy()));
}
}
}

View File

@ -931,21 +931,16 @@ namespace CapnpC.CSharp.Generator.CodeGen
InvocationExpression(
MemberAccessExpression(
SyntaxKind.SimpleMemberAccessExpression,
IdentifierName(nameof(Capnp.Rpc.Impatient)),
IdentifierName(nameof(Capnp.Rpc.Impatient.Access))))
.AddArgumentListArguments(
Argument(
_names.TaskParameter.IdentifierName),
Argument(
accessPath.IdentifierName),
Argument(
InvocationExpression(
MemberAccessExpression(
SyntaxKind.SimpleMemberAccessExpression,
IdentifierName(nameof(Capnp.Rpc.Impatient)),
IdentifierName(nameof(Capnp.Rpc.Impatient.GetAnswer))))
.AddArgumentListArguments(
Argument(
_names.TaskParameter.IdentifierName)),
IdentifierName(nameof(Capnp.Rpc.IPromisedAnswer.Access))))
.AddArgumentListArguments(
Argument(
accessPath.IdentifierName),
Argument(
InvocationExpression(
_names.AwaitProxy.IdentifierName))))))));
_names.AwaitProxy.IdentifierName))))))));
yield return pathDecl;
yield return methodDecl;