3
3
import logging
4
4
import time
5
5
import uuid
6
- from typing import Optional
6
+ from typing import Optional , Union
7
7
8
8
from .factories import get_broker
9
9
from .producers import BrokeredProducer
10
+ from .protocols import Producer
10
11
from .service .asyncio_tasks import ensure_fatal
11
12
12
13
logger = logging .getLogger ('awx.main.dispatch.control' )
@@ -23,49 +24,57 @@ class ControlCallbacks:
23
24
it exists to interact with producers, using variables relevant to the particular
24
25
control message being sent"""
25
26
26
- def __init__ (self , queuename , send_data , expected_replies ) -> None :
27
+ def __init__ (self , queuename : Optional [ str ] , send_data : dict , expected_replies : int ) -> None :
27
28
self .queuename = queuename
28
29
self .send_data = send_data
29
30
self .expected_replies = expected_replies
30
31
31
32
# received_replies only tracks the reply message, not the channel name
32
33
# because they come via a temporary reply_to channel and that is not user-facing
33
- self .received_replies : list [str ] = []
34
+ self .received_replies : list [Union [ dict , str ] ] = []
34
35
self .events = ControlEvents ()
35
36
self .shutting_down = False
36
37
37
- async def process_message (self , payload , producer = None , channel = None ) -> tuple [Optional [str ], Optional [str ]]:
38
+ async def process_message (
39
+ self , payload : Union [dict , str ], producer : Optional [Producer ] = None , channel : Optional [str ] = None
40
+ ) -> tuple [Optional [str ], Optional [str ]]:
38
41
self .received_replies .append (payload )
39
42
if self .expected_replies and (len (self .received_replies ) >= self .expected_replies ):
40
43
self .events .exit_event .set ()
41
44
return (None , None )
42
45
43
- async def connected_callback (self , producer ) -> None :
46
+ async def connected_callback (self , producer : Producer ) -> None :
44
47
payload = json .dumps (self .send_data )
45
- await producer .notify (channel = self .queuename , message = payload )
48
+ # Ignore the type hint here because we know it is a brokered producer
49
+ await producer .notify (channel = self .queuename , message = payload ) # type: ignore[attr-defined]
46
50
logger .info ('Sent control message, expecting replies soon' )
47
51
52
+ async def main (self ) -> None :
53
+ "Unused"
54
+ pass
55
+
48
56
49
57
class Control (object ):
50
58
def __init__ (self , broker_name : str , broker_config : dict , queue : Optional [str ] = None ) -> None :
51
59
self .queuename = queue
52
60
self .broker_name = broker_name
53
61
self .broker_config = broker_config
54
62
55
- def running (self , * args , ** kwargs ):
56
- return self .control_with_reply ('running' , * args , ** kwargs )
57
-
58
- def cancel (self , task_ids , with_reply = True ):
59
- if with_reply :
60
- return self .control_with_reply ('cancel' , extra_data = {'task_ids' : task_ids })
61
- else :
62
- self .control ({'control' : 'cancel' , 'task_ids' : task_ids , 'reply_to' : None }, extra_data = {'task_ids' : task_ids })
63
-
64
63
@classmethod
65
- def generate_reply_queue_name (cls ):
64
+ def generate_reply_queue_name (cls ) -> str :
66
65
return f"reply_to_{ str (uuid .uuid4 ()).replace ('-' , '_' )} "
67
66
68
- async def acontrol_with_reply_internal (self , producer , send_data , expected_replies , timeout ):
67
+ @staticmethod
68
+ def parse_replies (received_replies : list [Union [str , dict ]]) -> list [dict ]:
69
+ ret = []
70
+ for payload in received_replies :
71
+ if isinstance (payload , dict ):
72
+ ret .append (payload )
73
+ else :
74
+ ret .append (json .loads (payload ))
75
+ return ret
76
+
77
+ async def acontrol_with_reply_internal (self , producer : Producer , send_data : dict , expected_replies : int , timeout : float ) -> list [dict ]:
69
78
control_callbacks = ControlCallbacks (self .queuename , send_data , expected_replies )
70
79
71
80
await producer .start_producing (control_callbacks )
@@ -83,22 +92,22 @@ async def acontrol_with_reply_internal(self, producer, send_data, expected_repli
83
92
control_callbacks .shutting_down = True
84
93
await producer .shutdown ()
85
94
86
- return [ json . loads ( payload ) for payload in control_callbacks .received_replies ]
95
+ return self . parse_replies ( control_callbacks .received_replies )
87
96
88
- def make_producer (self , reply_queue ) :
97
+ def make_producer (self , reply_queue : str ) -> Producer :
89
98
broker = get_broker (self .broker_name , self .broker_config , channels = [reply_queue ])
90
99
return BrokeredProducer (broker , close_on_exit = True )
91
100
92
- async def acontrol_with_reply (self , command , expected_replies = 1 , timeout = 1 , data = None ):
101
+ async def acontrol_with_reply (self , command : str , expected_replies : int = 1 , timeout : int = 1 , data : Optional [ dict ] = None ) -> list [ dict ] :
93
102
reply_queue = Control .generate_reply_queue_name ()
94
- send_data = {'control' : command , 'reply_to' : reply_queue }
103
+ send_data : dict [ str , Union [ dict , str ]] = {'control' : command , 'reply_to' : reply_queue }
95
104
if data :
96
105
send_data ['control_data' ] = data
97
106
98
107
return await self .acontrol_with_reply_internal (self .make_producer (reply_queue ), send_data , expected_replies , timeout )
99
108
100
- async def acontrol (self , command , data = None ):
101
- send_data = {'control' : command }
109
+ async def acontrol (self , command : str , data : Optional [ dict ] = None ) -> None :
110
+ send_data : dict [ str , Union [ dict , str ]] = {'control' : command }
102
111
if data :
103
112
send_data ['control_data' ] = data
104
113
@@ -110,13 +119,13 @@ def control_with_reply(self, command: str, expected_replies: int = 1, timeout: f
110
119
logger .info ('control-and-reply {} to {}' .format (command , self .queuename ))
111
120
start = time .time ()
112
121
reply_queue = Control .generate_reply_queue_name ()
113
- send_data = {'control' : command , 'reply_to' : reply_queue }
122
+ send_data : dict [ str , Union [ dict , str ]] = {'control' : command , 'reply_to' : reply_queue }
114
123
if data :
115
124
send_data ['control_data' ] = data
116
125
117
126
broker = get_broker (self .broker_name , self .broker_config , channels = [reply_queue ])
118
127
119
- def connected_callback ():
128
+ def connected_callback () -> None :
120
129
payload = json .dumps (send_data )
121
130
if self .queuename :
122
131
broker .publish_message (channel = self .queuename , message = payload )
@@ -131,9 +140,9 @@ def connected_callback():
131
140
logger .info (f'control-and-reply message returned in { time .time () - start } seconds' )
132
141
return replies
133
142
134
- def control (self , command , data = None ):
143
+ def control (self , command : str , data : Optional [ dict ] = None ) -> None :
135
144
"Send message in fire-and-forget mode, as synchronous code. Only for no-reply control."
136
- send_data = {'control' : command }
145
+ send_data : dict [ str , Union [ dict , str ]] = {'control' : command }
137
146
if data :
138
147
send_data ['control_data' ] = data
139
148
0 commit comments