Per PEP-492 I am trying to implement an asynchronous iterator, such that I can do e.g.
async for foo in bar:
...
Here is a trivial example, similar to the one in the docs, with a very basic test of instantiation and async iteration:
import pytest
class TestImplementation:
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
@pytest.mark.asyncio # note use of pytest-asyncio marker
async def test_async_for():
async for _ in TestImplementation():
pass
However, when I execute my test suite, I see:
=================================== FAILURES ===================================
________________________________ test_async_for ________________________________
@pytest.mark.asyncio
async def test_async_for():
> async for _ in TestImplementation():
E TypeError: 'async for' received an invalid object from __aiter__: TestImplementation
...: TypeError
===================== 1 failed, ... passed in 2.89 seconds ======================
Why does my TestImplementation
appear to be invalid? As far as I can tell it meets the protocol:
- An object must implement an
__aiter__
method ... returning an asynchronous iterator object.- An asynchronous iterator object must implement an
__anext__
method ... returning an awaitable.- To stop iteration
__anext__
must raise aStopAsyncIteration
exception.
Aucun commentaire:
Enregistrer un commentaire