5
5
import anyio
6
6
from faststream .app import FastStream
7
7
from faststream .types import SendableMessage
8
- from taskiq import AsyncBroker , BrokerMessage
8
+ from taskiq import AsyncBroker
9
9
from taskiq .acks import AckableMessage
10
10
from taskiq .decor import AsyncTaskiqDecoratedTask
11
11
from typing_extensions import TypeAlias , override
12
12
13
- from taskiq_faststream .formatter import PatchedFormatter
14
- from taskiq_faststream .serializer import PatchedSerializer
13
+ from taskiq_faststream .formatter import PatchedFormatter , PathcedMessage
15
14
from taskiq_faststream .types import ScheduledTask
16
15
from taskiq_faststream .utils import resolve_msg
17
16
@@ -34,8 +33,7 @@ class BrokerWrapper(AsyncBroker):
34
33
35
34
def __init__ (self , broker : Any ) -> None :
36
35
super ().__init__ ()
37
- self .serializer = PatchedSerializer ()
38
- self .formatter = PatchedFormatter (self )
36
+ self .formatter = PatchedFormatter ()
39
37
self .broker = broker
40
38
41
39
async def startup (self ) -> None :
@@ -48,7 +46,7 @@ async def shutdown(self) -> None:
48
46
await self .broker .close ()
49
47
await super ().shutdown ()
50
48
51
- async def kick (self , message : BrokerMessage ) -> None :
49
+ async def kick (self , message : PathcedMessage ) -> None : # type: ignore[override]
52
50
"""Call wrapped FastStream broker `publish` method."""
53
51
await _broker_publish (self .broker , message )
54
52
@@ -111,7 +109,7 @@ class AppWrapper(BrokerWrapper):
111
109
112
110
def __init__ (self , app : FastStream ) -> None :
113
111
super (BrokerWrapper , self ).__init__ ()
114
- self .serializer = PatchedSerializer ()
112
+ self .formatter = PatchedFormatter ()
115
113
self .app = app
116
114
117
115
async def startup (self ) -> None :
@@ -124,7 +122,7 @@ async def shutdown(self) -> None:
124
122
await self .app ._shutdown () # noqa: SLF001
125
123
await super (BrokerWrapper , self ).shutdown ()
126
124
127
- async def kick (self , message : BrokerMessage ) -> None :
125
+ async def kick (self , message : PathcedMessage ) -> None : # type: ignore[override]
128
126
"""Call wrapped FastStream broker `publish` method."""
129
127
assert ( # noqa: S101
130
128
self .app .broker
@@ -134,11 +132,7 @@ async def kick(self, message: BrokerMessage) -> None:
134
132
135
133
async def _broker_publish (
136
134
broker : Any ,
137
- message : BrokerMessage ,
135
+ message : PathcedMessage ,
138
136
) -> None :
139
- labels = message .labels
140
- labels .pop ("schedule" , None )
141
- async for msg in resolve_msg (
142
- msg = labels .pop ("message" , message .message ),
143
- ):
144
- await broker .publish (msg , ** labels )
137
+ async for msg in resolve_msg (message .body ):
138
+ await broker .publish (msg , ** message .labels )
0 commit comments