@@ -92,25 +92,24 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
92
92
logger = logging .getLogger (__name__ )
93
93
94
94
95
- def decode_payload (encoded : str , compressed_profile : bool ) -> dict [str , Any ]:
96
- if compressed_profile :
97
- try :
98
- res = msgpack .unpackb (
99
- zlib .decompress (b64decode (encoded .encode ("utf-8" ))), use_list = False
100
- )
101
- metrics .incr ("profiling.profile_metrics.decompress" , tags = {"status" : "ok" })
102
- return res
103
- except Exception as e :
104
- logger .exception ("Failed to decompress compressed profile" , extra = {"error" : e })
105
- metrics .incr ("profiling.profile_metrics.decompress" , tags = {"status" : "err" })
106
- raise
107
-
108
- # not compressed
109
- return msgpack .unpackb (b64decode (encoded .encode ("utf-8" )), use_list = False )
95
+ def decode_payload (encoded : str ) -> dict [str , Any ]:
96
+ try :
97
+ res = msgpack .unpackb (zlib .decompress (b64decode (encoded .encode ("utf-8" ))), use_list = False )
98
+ metrics .incr ("profiling.profile_metrics.decompress" , tags = {"status" : "ok" })
99
+ return res
100
+ except Exception as e :
101
+ logger .exception ("Failed to decompress compressed profile" , extra = {"error" : e })
102
+ metrics .incr ("profiling.profile_metrics.decompress" , tags = {"status" : "err" })
103
+ raise
110
104
111
105
112
106
def encode_payload (message : dict [str , Any ]) -> str :
113
- return b64encode (msgpack .packb (message )).decode ("utf-8" )
107
+ return b64encode (
108
+ zlib .compress (
109
+ msgpack .packb (message ),
110
+ level = 1 ,
111
+ )
112
+ ).decode ("utf-8" )
114
113
115
114
116
115
@instrumented_task (
@@ -137,14 +136,13 @@ def process_profile_task(
137
136
profile : Profile | None = None ,
138
137
payload : str | None = None ,
139
138
sampled : bool = True ,
140
- compressed_profile : bool = False ,
141
139
** kwargs : Any ,
142
140
) -> None :
143
141
if not sampled and not options .get ("profiling.profile_metrics.unsampled_profiles.enabled" ):
144
142
return
145
143
146
144
if payload :
147
- message_dict = decode_payload (payload , compressed_profile )
145
+ message_dict = decode_payload (payload )
148
146
149
147
profile = json .loads (message_dict ["payload" ], use_rapid_json = True )
150
148
0 commit comments