Mocking dependencies that return AsyncIterable

Mocking dependencies that return AsyncIterable

Mocking dependencies that return a list populated by an async for-loop Bonus: RuntimeError: Event loop is closed

·

4 min read

There have been times when I've wanted to mock the return values that are AsyncIterable or a list that was created from async for-loop comprehension. Clients from the Python Azure SDK (blob and keyvault) return an AsyncItemPaged that inherits from AsyncIterable. Below is some code that illustrates mocking an asynchronous iterator called from a function that uses it to return a list. Similar mocking is done for the code under test that has dependencies to clients that return AsyncIterable.

HttpClient

This class is wrapped by wrapper.py

     async def get_luke_skywalker_film_appearance(self)  -> List[str]:
        films: List[str] = []

        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_uri}/people/1/") as response:
                response_dict = await response.json()
                res_dict = response_dict['films']
                print(res_dict)

        return [f async for f in films]

Wrapper

All it does is wrap around the HttpClient calls and returns the result and prints the time taken for the call.

from datetime import datetime
from typing import AsyncIterable, List
from common.httpclient import HttpClient

class Wrapper:
    def __init__(self, httpClient: HttpClient):
        self.httpclient = httpClient

    async def wrap_async(self) -> List[str]:
        start = datetime.now()
        res = await self.httpclient.get_luke_skywalker_film_appearance()
        print(f'Time: {datetime.now() - start}')
        return res

    async def wrap_async_async(self) -> List[str]:
        start = datetime.now()
        res = await self.httpclient.get_luke_skywalker_film_appearance()
        print(f'Time: {datetime.now() - start}')
        return [r async for r in res]

The wrap_async_async function mimics a list of AsyncIterable.

Unit Tests

Wrap_Async

    BaseUri: str = 'https://swapi.dev/api/'

    @pytest.mark.asyncio
    async def test_wrap_async(self):
        """ Test where the dependency under test will return a list."""
        # Arrange
        mock_httpclient = AsyncMock(name='HttpClient')

        # Create mock object with create_date property.
        create_date: datetime = datetime.utcnow().replace(tzinfo=timezone.utc)
        rtn_mock = MagicMock(create_date = create_date)
        rtn_mock.name = "TEST"

        mock_httpclient.get_luke_skywalker_film_appearance.return_value = [rtn_mock]
        wrapper = Wrapper(mock_httpclient)

        # Act
        films = await wrapper.wrap_async()

        # Assert
        assert films is not None
        mock_httpclient.get_luke_skywalker_film_appearance.assert_called_once()
        assert films[0].name == rtn_mock.name

The return value is a MagicMock containing the name and create_date attributes. Specify the spec argument, MagicMock(spec=SomeClass) returns a mock that matches the passed-in class attribute. Since wrap_async returns a list, we just set the mock_httpclient.get_luke_skywalker_film_appearance.return_value to return a list with rtn_mock.

Wrap_Async_Async

Since HttpClient.get_luke_skywalker_film_appearance() returns a list, but the list returned was created by a list comprehension that contains an async for-loop, setting the mock_httpclient.get_luke_skywalker_film_appearance.return_value to a list will return the expected mock.

This test will set the mock_httpclient.get_luke_skywalker_film_appearance.return_value to return a mock that has the __aiter__.return_value set to [rtn_mock] so that the evaluation of [r async for r in res] returns a list with the expected rtn_mock.

    @pytest.mark.asyncio
    async def test_wrap_async_async(self):
        """ Test where the dependency under test will return a list from async for-loop list compreshension. """

        # Arrange
        mock_httpclient = AsyncMock(name='HttpClient')

        # Create mock object with create_date property.
        create_date: datetime = datetime.utcnow().replace(tzinfo=timezone.utc)
        rtn_mock = MagicMock(create_date = create_date)
        rtn_mock.name = "TEST"

        mock_iterator = MagicMock()

        # Set the return value of what is returned from asynchronous iterator.
        mock_iterator.__aiter__.return_value = [rtn_mock]

        mock_httpclient.get_luke_skywalker_film_appearance.return_value = mock_iterator
        wrapper = Wrapper(mock_httpclient)

        # Act
        films = await wrapper.wrap_async_async()

        # Assert
        assert films is not None
        mock_httpclient.get_luke_skywalker_film_appearance.assert_called_once()
        assert films[0].name == rtn_mock.name

Note, that do not do thi:

mock_iterator.__aiter__.return_value = [MagicMock(name = 'TEST')]

Name is a special attribute:

As opposed to create_date:

mock_iterator.__aiter__.return_value = [MagicMock(create_date = create_date)]

Explicitly setting the attribute works:

rtn_mock = MagicMock(create_date = create_date)
rtn_mock.name = "TEST"

Full code:

Event loop

I originally wanted to document the mocking of an async function's dependencies but ended up encountering other issues and working around them, specifically on Windows.

Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x000001855E9BEA60>
Traceback (most recent call last):
  File "C:\Users\jtong\Anaconda3\envs\taxi\lib\asyncio\proactor_events.py", line 116, in __del__
    self.close()
  File "C:\Users\jtong\Anaconda3\envs\taxi\lib\asyncio\proactor_events.py", line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\Users\jtong\Anaconda3\envs\taxi\lib\asyncio\base_events.py", line 751, in call_soon
    self._check_closed()
  File "C:\Users\jtong\Anaconda3\envs\taxi\lib\asyncio\base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

To workaround this, sleeping for 0.5 seconds even asynchronously, will prevent this RuntimeError from occurring.

async def get_luke_skywalker_film_appearance_eventloop_error(self) -> List[str]:
    films: List[str] = []
    async def call():
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_uri}/people/1/") as response:
                response_dict = await response.json()
                print(response_dict)
                return response_dict['films']


    #asyncio.run(call())
    #asyncio.get_event_loop().run_until_complete(call())
    films = await call()

    # Workaround 'RuntimeError: Event loop is closed' on Windows.
    # https://github.com/encode/httpx/issues/914
    if platform.system() == 'Windows':
        #time.sleep(0.0)
        asyncio.sleep(0.5)

    return films

This method does not reproduce the error:

async def get_luke_skywalker_film_appearance(self) -> AsyncIterable[str]:
    films: List[str] = []

    async with aiohttp.ClientSession() as session:
        async with session.get(f"{self.base_uri}/people/1/") as response:
            response_dict = await response.json()
            films = response_dict['films']
            print(films)

    return films

Note that the import nest_async and call to nest_asyncio.apply() is needed to resolve the error cannot run the event loop while another loop is running.

References