-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomplex_dataworkers.py
38 lines (33 loc) · 1.15 KB
/
complex_dataworkers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AggregateChunks(CompositeStep):
processor = ClickhouseProcessor
def __init__(self, ctx, **kw):
pipeline = Pipeline(name=kw["name"])
# input: span
# {"org_id": 1, "span_id": 1, "trace_id": "abc"}
# {"org_id": 1, "span_id": 2, "trace_id": "def"}
# {"org_id": 1, "span_id": 3, "trace_id": "abc"}
# output: tracebuffer for specific (org_id,trace_id)
# TraceBuffer([1, 3])
# TraceBuffer([2])
reduce = Reduce(
storage_type='redis',
# group_by default = group by whatever the runtime
# wants
group_by="trace_id",
inputs=kw['inputs'],
create_acculumator=TraceBuffer,
lambda acc, row: acc.add_message(row),
# max_time=100,
# max_size=100,
should_flush=lambda acc: acc.should_flush() ,
ctx=pipeline,
name='reduce',
)
Map(
concurrency=1,
inputs=[reduce],
lambda buffer: client.process_messages(),
ctx=pipeline,
name='map',
)
super().__init__(ctx, pipeline, **kw)