Skip to content

Commit e98fb23

Browse files
committed
Add generic-source and generic-sink kinds (#59)
1 parent ae1dc8c commit e98fb23

File tree

1 file changed

+60
-40
lines changed

1 file changed

+60
-40
lines changed

rayvens/core/catalog.py

Lines changed: 60 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,36 @@
1515
#
1616

1717

18-
# construct a camel source specification from a rayvens source config
19-
def construct_source(config, endpoint, inverted=False):
20-
if 'kind' not in config:
21-
raise TypeError('A Camel source needs a kind.')
22-
if config['kind'] not in ['http-source']:
23-
raise TypeError('Unsupported Camel source.')
18+
def http_source(config):
2419
if 'url' not in config:
2520
raise TypeError('Kind http-source requires a url.')
2621
url = config['url']
2722
period = config.get('period', 1000)
23+
return {'uri': f'timer:tick?period={period}', 'steps': [{'to': url}]}
24+
25+
26+
def generic_source(config):
27+
if 'spec' not in config:
28+
raise TypeError('Kind generic-source requires a spec.')
29+
return config['spec']
30+
31+
32+
sources = {'http-source': http_source, 'generic-source': generic_source}
33+
2834

35+
# construct a camel source specification from a rayvens source config
36+
def construct_source(config, endpoint, inverted=False):
37+
if 'kind' not in config:
38+
raise TypeError('A Camel source needs a kind.')
39+
kind = config['kind']
40+
f = sources.get(kind)
41+
if f is None:
42+
raise TypeError(f'Unsupported Camel source: {kind}.')
43+
spec = f(config)
2944
if inverted:
30-
return [{
31-
'from': {
32-
'uri': f'timer:tick?period={period}',
33-
'steps': [{
34-
'to': url
35-
}, {
36-
'bean': 'addToQueue'
37-
}]
38-
},
45+
spec['steps'].append({'bean': 'addToQueue'})
46+
spec = [{
47+
'from': spec
3948
}, {
4049
'from': {
4150
'uri': endpoint,
@@ -44,37 +53,48 @@ def construct_source(config, endpoint, inverted=False):
4453
}]
4554
}
4655
}]
47-
48-
return [{
49-
'from': {
50-
'uri': f'timer:tick?period={period}',
51-
'steps': [{
52-
'to': url
53-
}, {
54-
'to': endpoint
55-
}]
56-
}
57-
}]
56+
else:
57+
spec['steps'].append({'to': endpoint})
58+
spec = [{'from': spec}]
59+
print(spec)
60+
return spec
5861

5962

60-
# construct a camel sink specification from a rayvens sink config
61-
def construct_sink(config, endpoint):
62-
if 'kind' not in config:
63-
raise TypeError('A Camel sink needs a kind.')
64-
if config['kind'] not in ['slack-sink']:
65-
raise TypeError('Unsupported Camel sink.')
63+
def slack_sink(config):
6664
if 'channel' not in config:
6765
raise TypeError('Kind slack-sink requires a channel.')
6866
if 'webhookUrl' not in config:
6967
raise TypeError('Kind slack-sink requires a webhookUrl.')
7068
channel = config['channel']
7169
webhookUrl = config['webhookUrl']
7270

73-
return [{
74-
'from': {
75-
'uri': endpoint,
76-
'steps': [{
77-
'to': f'slack:{channel}?webhookUrl={webhookUrl}',
78-
}]
79-
}
80-
}]
71+
return {
72+
'steps': [{
73+
'to': f'slack:{channel}?webhookUrl={webhookUrl}',
74+
}]
75+
}
76+
77+
78+
def generic_sink(config):
79+
if 'spec' not in config:
80+
raise TypeError('Kind generic-sink requires a spec.')
81+
return config['spec']
82+
83+
84+
sinks = {'slack-sink': slack_sink, 'generic-sink': generic_sink}
85+
86+
87+
# construct a camel sink specification from a rayvens sink config
88+
def construct_sink(config, endpoint):
89+
if 'kind' not in config:
90+
raise TypeError('A Camel sink needs a kind.')
91+
kind = config['kind']
92+
f = sinks.get(kind)
93+
if f is None:
94+
raise TypeError(f'Unsupported Camel sink: {kind}.')
95+
96+
spec = f(config)
97+
spec['uri'] = endpoint
98+
spec = [{'from': spec}]
99+
print(spec)
100+
return spec

0 commit comments

Comments
 (0)