Revisions for untitled paste

View the changes made to this paste.

unlisted ⁨1⁩ ⁨file⁩ 2021-11-27 18:34:27 UTC

modified_nats_pytest_example.py

@@ -0,0 +1,93 @@

+from starlette.routing import Route
+from starlette.testclient import TestClient
+import pytest
+from starlette.applications import Starlette
+from starlette.responses import PlainTextResponse
+import asyncio
+
+
+from nats.aio.client import Client as NATS
+
+"""
+Test work with starlette 0.14.2
+Error with starlette 0.15.0: RunTimeError: got Future <Future pending> attached to a different loop
+Error with starlette 0.16.0: Nats timeout
+
+The test is that the client code makes a nats request to a mocked nats service over nats itself.
+
+Requirement a running nats server `docker run -d -p 4222:4222 nats:latest`
+"""
+
+HOST_NATS = "localhost:4222"
+
+
+# =======================================================================
+# CODE
+# =======================================================================
+
+
+def create_app():
+    async def index(request):
+        r = await request.app.state.nc.request("subject1", timeout=1, payload=b"PING")
+        return PlainTextResponse(content=r.data.decode())
+
+    async def setup() -> None:
+        app.state.nc = NATS()
+        await app.state.nc.connect(HOST_NATS)
+        print("Connected to nats")
+
+    app = Starlette(debug=True, routes=[Route('/', index)], on_startup=[setup])
+
+    return app
+
+
+app = create_app()
+
+
+# =======================================================================
+# MOCKS & TESTS
+# =======================================================================
+
+
+class NatsServiceMock:
+    def __init__(self) -> None:
+        self.nc: NATS = NATS()
+
+    async def lifespan(self) -> None:
+        await self.nc.connect(HOST_NATS)
+        await self.nc.subscribe("subject1", cb=self.handle)
+
+    async def handle(self, msg):
+        await self.nc.publish(msg.reply, b"PONG")
+
+    def __enter__(self):
+        loop = asyncio.get_event_loop()
+        loop.run_until_complete(self.lifespan())
+        return self
+
+    def __exit__(self, *args) -> None:
+        pass
+
+
[email protected](scope="session")
+def nats_service(test_app) -> NatsServiceMock:
+    with NatsServiceMock() as nc:
+        yield nc
+
+
[email protected](scope="session")
+def test_app() -> TestClient:
+    with TestClient(create_app()) as client:
+        yield client
+
+
+# =======================================================================
+# TESTS
+# =======================================================================
+
+
+def test_index_should_give_a_succesful_response(test_app, nats_service):
+    r = test_app.get("/")
+    assert r.status_code == 200
+    assert r.text == "PONG"
+