Skip to content

Commit 833dd62

Browse files
authored
Add support for header passing to sinks (#60)
* Add support for header passing to sinks. * Use json file for custom file. * Fix call to producer helper. * Update comment.
1 parent 25d55b0 commit 833dd62

File tree

4 files changed

+57
-23
lines changed

4 files changed

+57
-23
lines changed

examples/cloud_object_storage/cos_sink.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rayvens
1919
import sys
2020
import time
21+
import json
2122

2223
# This example demonstrates how to send objects to the AWS S3 or
2324
# IBM Cloud Object Storage.
@@ -37,6 +38,11 @@
3738

3839
# Initialize Ray and Rayvens
3940
ray.init()
41+
42+
# TODO: make header setting work for Kafka transport. Currently the
43+
# Camel-K component for Kafka does not propagate message headers. This
44+
# will be fixed by Camel-K 1.8.0 release.
45+
# rayvens.init(transport="kafka")
4046
rayvens.init()
4147

4248
# Create an object stream
@@ -47,8 +53,7 @@
4753
bucket_name=bucket,
4854
access_key_id=access_key_id,
4955
secret_access_key=secret_access_key,
50-
endpoint=endpoint,
51-
file_name="test-file-name.txt")
56+
endpoint=endpoint)
5257

5358
if region is not None:
5459
sink_config['region'] = region
@@ -57,7 +62,10 @@
5762
sink = stream.add_sink(sink_config)
5863

5964
# Send file contents to Cloud Object Storage:
60-
stream << "File contents sample!"
65+
json_content = ['test', {'json': ('content', None, 1.0, 2)}]
66+
event = rayvens.OutputEvent(json.dumps(json_content),
67+
{"CamelAwsS3Key": "custom_file.json"})
68+
stream << event
6169

6270
# Run for a while
6371
time.sleep(10)

rayvens/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
#
1616

1717
from rayvens.api import init, Stream
18+
from rayvens.core.common import OutputEvent
1819

19-
__all__ = ['init', 'Stream']
20+
__all__ = ['init', 'Stream', 'OutputEvent']

rayvens/core/catalog.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ def cos_sink(config):
455455
secret_access_key = config['secret_access_key']
456456
endpoint = config['endpoint']
457457

458-
file_name = None
458+
file_name = "default.txt"
459459
if 'file_name' in config:
460460
file_name = config['file_name']
461461

@@ -578,18 +578,19 @@ def cos_sink(config):
578578

579579
# This is the default behavior when user application data is
580580
# written into a COS file directly.
581-
if 'file_name' not in config:
582-
raise TypeError('Created cloud object name is required.')
583-
regular_spec = {
584-
'steps': [{
585-
'set-header': {
586-
'name': 'CamelAwsS3Key',
587-
'simple': f"{file_name}"
588-
}
589-
}, {
590-
'to': uri
591-
}]
592-
}
581+
if 'file_name' in config:
582+
regular_spec = {
583+
'steps': [{
584+
'set-header': {
585+
'name': 'CamelAwsS3Key',
586+
'simple': f"{file_name}"
587+
}
588+
}, {
589+
'to': uri
590+
}]
591+
}
592+
else:
593+
regular_spec = {'steps': [{'to': uri}]}
593594
spec_list.append((regular_spec, None))
594595
return spec_list
595596

rayvens/core/common.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@
2828
from ray.remote_function import RemoteFunction
2929

3030

31+
class OutputEvent:
32+
def __init__(self, data, headers={}):
33+
self.data = data
34+
self.headers = headers
35+
36+
3137
def get_run_mode(camel_mode, check_port):
3238
if camel_mode == 'auto' or camel_mode == 'local':
3339
mode.run_mode = RayvensMode.LOCAL
@@ -116,11 +122,22 @@ def __init__(self, url):
116122
self.url = url
117123

118124
def append(self, data):
125+
event_data = data
126+
event_headers = {}
127+
if isinstance(data, OutputEvent):
128+
event_data = data.data
129+
event_headers = data.headers
119130
try:
120131
if isinstance(data, Path):
121-
requests.post(self.url, str(data), timeout=(5, None))
132+
requests.post(self.url,
133+
str(event_data),
134+
timeout=(5, None),
135+
headers=event_headers)
122136
else:
123-
requests.post(self.url, data, timeout=(5, None))
137+
requests.post(self.url,
138+
event_data,
139+
timeout=(5, None),
140+
headers=event_headers)
124141
except requests.exceptions.ConnectionError:
125142
pass
126143

@@ -170,20 +187,27 @@ def __init__(self, name):
170187
self.producer = ProducerHelper(name)
171188

172189
def append(self, data):
190+
event_data = data
191+
event_headers = {}
192+
if isinstance(data, OutputEvent):
193+
event_data = data.data
194+
event_headers = data.headers
173195
if isinstance(data, Path):
174-
self.producer.produce(str(data))
196+
self.producer.produce(str(event_data), headers=event_headers)
175197
else:
176-
self.producer.produce(data)
198+
self.producer.produce(event_data, headers=event_headers)
177199

178200

179201
class ProducerHelper:
180202
def __init__(self, name):
181203
self.name = name
182204
self.producer = Producer({'bootstrap.servers': brokers()})
183205

184-
def produce(self, data):
206+
def produce(self, data, headers={}):
185207
if data is not None:
186-
self.producer.produce(self.name, data.encode('utf-8'))
208+
self.producer.produce(self.name,
209+
data.encode('utf-8'),
210+
headers=headers)
187211

188212

189213
@ray.remote(num_cpus=0.05)

0 commit comments

Comments
 (0)