TLS-enabled test code

This commit is contained in:
Nadja Reitzenstein 2021-12-09 21:54:22 +01:00
parent a4e73c65b7
commit aa378eec2c
3 changed files with 58 additions and 17 deletions

View File

@ -1,24 +1,57 @@
import capnp import asyncio
capnp.remove_import_hook() import socket
import ssl
import capnp
connection_capnp = capnp.load('schema/connection.capnp') connection_capnp = capnp.load('schema/connection.capnp')
authenticationsystem_capnp = capnp.load('schema/authenticationsystem.capnp') authenticationsystem_capnp = capnp.load('schema/authenticationsystem.capnp')
def connect(host, user, pw): async def myreader(client, reader):
"""TODO: Docstring for connect. while True:
:returns: TODO data = await reader.read(4096)
client.write(data)
"""
client = capnp.TwoPartyClient(host) async def mywriter(client, writer):
boot = client.bootstrap().cast_as(connection_capnp.Bootstrap) while True:
auth = boot.authenticationSystem().authenticationSystem data = await client.read(4096)
writer.write(data.tobytes())
await writer.drain()
async def connect(host, port, user, pw):
# Setup SSL context
ctx = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH
)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# Handle both IPv4 and IPv6 cases
try:
reader, writer = await asyncio.open_connection(
host, port, ssl=ctx, family=socket.AF_INET6
)
except Exception:
reader, writer = await asyncio.open_connection(
host, port, ssl=ctx, family=socket.AF_INET
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
cap = client.bootstrap().cast_as(connection_capnp.Bootstrap)
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
auth = cap.authenticationSystem().authenticationSystem
req = auth.start_request() req = auth.start_request()
req.request.mechanism = "PLAIN" req.request.mechanism = "PLAIN"
req.request.initialResponse.initial = "\0" + user + "\0" + pw req.request.initialResponse.initial = "\0" + user + "\0" + pw
rep_prom = req.send() rep_prom = req.send()
response = rep_prom.wait() response = await rep_prom.a_wait()
if response.response.outcome.result == "successful": if response.response.outcome.result == "successful":
return boot return cap
else: else:
print("Auth failed") print("Auth failed")
return None return None

2
schema

@ -1 +1 @@
Subproject commit 743c18393c6e01edeec1e04ab8998176d78f9a04 Subproject commit c855646a90958ae575d58be074d187acb9f8f4fa

18
test.py
View File

@ -1,7 +1,15 @@
import asyncio
import fabapi import fabapi
boot = fabapi.connect("localhost:59661", "Testuser", "secret")
ms = boot.machineSystem().wait().machineSystem async def main():
info = ms.info().wait().info boot = await fabapi.connect("localhost", 59661, "Testuser", "secret")
ma = info.getMachine("Another").wait() if boot:
my = info.getMachine("Yetmore").wait() ms = await boot.machineSystem().a_wait()
info = await ms.machineSystem.info().a_wait()
ma = await info.info.getMachineURN("urn:fabaccess:resource:Another").a_wait()
print(ma.machine)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())