untitled paste

Modified pytest program for starlette error (modification from @MatthewScholefield)

unlisted ⁨1⁩ ⁨file⁩ 2021-11-27 18:34:27 UTC

modified_nats_pytest_example.py

Raw
from starlette.routing import Route
from starlette.testclient import TestClient
import pytest
from starlette.applications import Starlette
from starlette.responses import PlainTextResponse
import asyncio


from nats.aio.client import Client as NATS

"""
Test work with starlette 0.14.2
Error with starlette 0.15.0: RunTimeError: got Future <Future pending> attached to a different loop
Error with starlette 0.16.0: Nats timeout

The test is that the client code makes a nats request to a mocked nats service over nats itself.

Requirement a running nats server `docker run -d -p 4222:4222 nats:latest`
"""

HOST_NATS = "localhost:4222"


# =======================================================================
# CODE
# =======================================================================


def create_app():
    async def index(request):
        r = await request.app.state.nc.request("subject1", timeout=1, payload=b"PING")
        return PlainTextResponse(content=r.data.decode())

    async def setup() -> None:
        app.state.nc = NATS()
        await app.state.nc.connect(HOST_NATS)
        print("Connected to nats")

    app = Starlette(debug=True, routes=[Route('/', index)], on_startup=[setup])

    return app


app = create_app()


# =======================================================================
# MOCKS & TESTS
# =======================================================================


class NatsServiceMock:
    def __init__(self) -> None:
        self.nc: NATS = NATS()

    async def lifespan(self) -> None:
        await self.nc.connect(HOST_NATS)
        await self.nc.subscribe("subject1", cb=self.handle)

    async def handle(self, msg):
        await self.nc.publish(msg.reply, b"PONG")

    def __enter__(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.lifespan())
        return self

    def __exit__(self, *args) -> None:
        pass


@pytest.fixture(scope="session")
def nats_service(test_app) -> NatsServiceMock:
    with NatsServiceMock() as nc:
        yield nc


@pytest.fixture(scope="session")
def test_app() -> TestClient:
    with TestClient(create_app()) as client:
        yield client


# =======================================================================
# TESTS
# =======================================================================


def test_index_should_give_a_succesful_response(test_app, nats_service):
    r = test_app.get("/")
    assert r.status_code == 200
    assert r.text == "PONG"