modified_nats_pytest_example.py
from starlette.routing import Route
from starlette.testclient import TestClient
import pytest
from starlette.applications import Starlette
from starlette.responses import PlainTextResponse
import asyncio
from nats.aio.client import Client as NATS
"""
Test work with starlette 0.14.2
Error with starlette 0.15.0: RunTimeError: got Future <Future pending> attached to a different loop
Error with starlette 0.16.0: Nats timeout
The test is that the client code makes a nats request to a mocked nats service over nats itself.
Requirement a running nats server `docker run -d -p 4222:4222 nats:latest`
"""
HOST_NATS = "localhost:4222"
# =======================================================================
# CODE
# =======================================================================
def create_app():
async def index(request):
r = await request.app.state.nc.request("subject1", timeout=1, payload=b"PING")
return PlainTextResponse(content=r.data.decode())
async def setup() -> None:
app.state.nc = NATS()
await app.state.nc.connect(HOST_NATS)
print("Connected to nats")
app = Starlette(debug=True, routes=[Route('/', index)], on_startup=[setup])
return app
app = create_app()
# =======================================================================
# MOCKS & TESTS
# =======================================================================
class NatsServiceMock:
def __init__(self) -> None:
self.nc: NATS = NATS()
async def lifespan(self) -> None:
await self.nc.connect(HOST_NATS)
await self.nc.subscribe("subject1", cb=self.handle)
async def handle(self, msg):
await self.nc.publish(msg.reply, b"PONG")
def __enter__(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.lifespan())
return self
def __exit__(self, *args) -> None:
pass
@pytest.fixture(scope="session")
def nats_service(test_app) -> NatsServiceMock:
with NatsServiceMock() as nc:
yield nc
@pytest.fixture(scope="session")
def test_app() -> TestClient:
with TestClient(create_app()) as client:
yield client
# =======================================================================
# TESTS
# =======================================================================
def test_index_should_give_a_succesful_response(test_app, nats_service):
r = test_app.get("/")
assert r.status_code == 200
assert r.text == "PONG"