@@ -173,25 +173,9 @@ def create_with_partitions(
173
173
partitions : Mapping [Partition , int ],
174
174
) -> ProcessingStrategy [KafkaPayload ]:
175
175
"""Create the processing strategy chain."""
176
- logger .info ("KAFKASETUP: Creating processing strategy chain" )
177
- logger .info (f"KAFKASETUP: Partitions: { dict (partitions )} " )
178
- logger .info (f"KAFKASETUP: Commit strategy: { type (commit )} " )
179
-
180
- # Start with the commit strategy (always last in chain)
181
176
next_step : ProcessingStrategy [Any ] = CommitOffsets (commit )
182
- logger .info ("KAFKASETUP: Base strategy: CommitOffsets" )
183
-
184
- logger .info ("KAFKASETUP: Checking healthcheck configuration..." )
185
- logger .info (f"KAFKASETUP: Healthcheck file configured: { self .healthcheck_file } " )
186
- logger .info ("KAFKASETUP: Adding Healthcheck strategy to processing chain" )
187
177
assert self .healthcheck_file
188
178
next_step = Healthcheck (self .healthcheck_file , next_step )
189
- logger .info ("KAFKASETUP: Healthcheck strategy added successfully" )
190
-
191
- # Use RunTaskInThreads for concurrent processing
192
- logger .info ("KAFKASETUP: Setting up concurrent message processing" )
193
- logger .info (f"KAFKASETUP: Concurrency level: { self .concurrency } " )
194
- logger .info (f"KAFKASETUP: Max pending futures: { self .max_pending_futures } " )
195
179
196
180
def process_message (msg : Message [KafkaPayload ]) -> Any :
197
181
try :
@@ -208,8 +192,6 @@ def process_message(msg: Message[KafkaPayload]) -> Any:
208
192
next_step = next_step ,
209
193
)
210
194
211
- logger .info ("KAFKASETUP: Processing strategy chain creation complete" )
212
- logger .info (f"KAFKASETUP: Final strategy type: { type (strategy )} " )
213
195
return strategy
214
196
215
197
0 commit comments