modified_nats_pytest_example.py
@@ -0,0 +1,93 @@
+from starlette.routing import Route
+from starlette.testclient import TestClient
+import pytest
+from starlette.applications import Starlette
+from starlette.responses import PlainTextResponse
+import asyncio
+
+
+from nats.aio.client import Client as NATS
+
+"""
+Test work with starlette 0.14.2
+Error with starlette 0.15.0: RunTimeError: got Future <Future pending> attached to a different loop
+Error with starlette 0.16.0: Nats timeout
+
+The test is that the client code makes a nats request to a mocked nats service over nats itself.
+
+Requirement a running nats server `docker run -d -p 4222:4222 nats:latest`
+"""
+
+HOST_NATS = "localhost:4222"
+
+
+# =======================================================================
+# CODE
+# =======================================================================
+
+
+def create_app():
+ async def index(request):
+ r = await request.app.state.nc.request("subject1", timeout=1, payload=b"PING")
+ return PlainTextResponse(content=r.data.decode())
+
+ async def setup() -> None:
+ app.state.nc = NATS()
+ await app.state.nc.connect(HOST_NATS)
+ print("Connected to nats")
+
+ app = Starlette(debug=True, routes=[Route('/', index)], on_startup=[setup])
+
+ return app
+
+
+app = create_app()
+
+
+# =======================================================================
+# MOCKS & TESTS
+# =======================================================================
+
+
+class NatsServiceMock:
+ def __init__(self) -> None:
+ self.nc: NATS = NATS()
+
+ async def lifespan(self) -> None:
+ await self.nc.connect(HOST_NATS)
+ await self.nc.subscribe("subject1", cb=self.handle)
+
+ async def handle(self, msg):
+ await self.nc.publish(msg.reply, b"PONG")
+
+ def __enter__(self):
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(self.lifespan())
+ return self
+
+ def __exit__(self, *args) -> None:
+ pass
+
+
[email protected](scope="session")
+def nats_service(test_app) -> NatsServiceMock:
+ with NatsServiceMock() as nc:
+ yield nc
+
+
[email protected](scope="session")
+def test_app() -> TestClient:
+ with TestClient(create_app()) as client:
+ yield client
+
+
+# =======================================================================
+# TESTS
+# =======================================================================
+
+
+def test_index_should_give_a_succesful_response(test_app, nats_service):
+ r = test_app.get("/")
+ assert r.status_code == 200
+ assert r.text == "PONG"
+