20
20
if sys .version_info >= (3 , 11 ):
21
21
from asyncio import timeout
22
22
else :
23
- from async_timeout import timeout
23
+ from async_timeout import timeout # noqa: F401
24
24
25
25
from confluent_kafka .cimpl import Consumer
26
26
from confluent_kafka .error import ConsumeError , KeyDeserializationError , ValueDeserializationError
@@ -48,9 +48,9 @@ def __aiter__(self):
48
48
async def __anext__ (self ):
49
49
return await self .poll (None )
50
50
51
- async def poll (self , timeout : int = - 1 ):
52
- timeout = None if timeout == - 1 else timeout
53
- async with asyncio .timeout (timeout ):
51
+ async def poll (self , poll_timeout : int = - 1 ):
52
+ poll_timeout = None if poll_timeout == - 1 else poll_timeout
53
+ async with asyncio .timeout (poll_timeout ):
54
54
while True :
55
55
# Zero timeout here is what makes it non-blocking
56
56
msg = super ().poll (0 )
@@ -67,8 +67,8 @@ def __init__(self, conf):
67
67
self ._value_deserializer = conf_copy .pop ('value.deserializer' , None )
68
68
super ().__init__ (conf_copy )
69
69
70
- async def poll (self , timeout = - 1 ):
71
- msg = await super ().poll (timeout )
70
+ async def poll (self , poll_timeout = - 1 ):
71
+ msg = await super ().poll (poll_timeout )
72
72
73
73
if msg is None :
74
74
return None
@@ -96,7 +96,7 @@ async def poll(self, timeout=-1):
96
96
msg .set_value (value )
97
97
return msg
98
98
99
- def consume (self , num_messages = 1 , timeout = - 1 ):
99
+ def consume (self , num_messages = 1 , consume_timeout = - 1 ):
100
100
"""
101
101
:py:func:`Consumer.consume` not implemented, use
102
102
:py:func:`DeserializingConsumer.poll` instead
0 commit comments