Skip to content

Commit b384b20

Browse files
authored
Enable Kafka transport tests (#55)
* Add new kafka transport tests. * Wait for events flag when using verify logs. * Wait for at least one event to be propagated. * Add tests to Travis. * Fix tests.
1 parent ea038d6 commit b384b20

10 files changed

+167
-21
lines changed

.travis.yml

+6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ script:
3636
- RAYVENS_TEST_MODE=local RAYVENS_TRANSPORT=kafka python ./tests/source.py
3737
# - ray submit ./scripts/cluster.yaml ./tests/stream.py
3838
- ray submit ./scripts/cluster.yaml ./tests/source.py
39+
- RAYVENS_TEST_MODE=local python ./tests/kafka_transport.py
40+
- ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py local
41+
- RAYVENS_TEST_MODE=local python ./tests/kafka_scaling_transport.py
42+
- ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py local
3943

4044
# Test operator mode
4145
# - ray submit ./scripts/cluster.yaml ./tests/source_operator.py
@@ -47,6 +51,8 @@ script:
4751
# - ray submit ./scripts/cluster.yaml ./tests/generic_sink.py
4852
# - RAYVENS_TEST_MODE=local python ./tests/generic_source.py
4953
# - ray submit ./scripts/cluster.yaml ./tests/generic_source.py
54+
- ray submit ./scripts/cluster.yaml ./tests/kafka_transport.py operator
55+
- ray submit ./scripts/cluster.yaml ./tests/kafka_scaling_transport.py operator
5056

5157
# Test mixed mode
5258
- RAYVENS_TEST_MODE=mixed python ./tests/sink.py

examples/cloud_object_storage/cos_sink_from_directory.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
2120

2221
# This example demonstrates how to send objects to the AWS S3 or
2322
# IBM Cloud Object Storage using multi-part uploads.
@@ -69,7 +68,5 @@
6968
# the monitored file system directory.
7069
dir_to_sink = stream.add_sink(dir_to_sink_config)
7170

72-
# stream._meta('verify_log', dir_to_sink, "BLA")
73-
7471
# Run for a while to give a chance for files to be dropped inside the directory
75-
time.sleep(100)
72+
stream.disconnect_all(after_idle_for=20)

examples/cloud_object_storage/cos_sink_from_file.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
2120
from pathlib import Path
2221

2322
# This example demonstrates how to send objects to the AWS S3 or
@@ -103,4 +102,4 @@
103102
stream << "Some other input which is invalid."
104103

105104
# Run for a while
106-
time.sleep(30)
105+
stream.disconnect_all(after_idle_for=5)

examples/cloud_object_storage/cos_sink_multi_part.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import ray
1818
import rayvens
1919
import sys
20-
import time
2120
from pathlib import Path
2221

2322
# This example demonstrates how to send objects to the AWS S3 or
@@ -73,4 +72,4 @@
7372
stream << Path("test_files/test.txt")
7473

7574
# Run for a while
76-
time.sleep(20)
75+
stream.disconnect_all(after_idle_for=5)

rayvens/api.py

+8
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ def disconnect_all(self,
8080
self._wait_for_timeout(after_idle_for, after)
8181
return ray.get(self.actor.disconnect_all.remote(stream_drain_timeout))
8282

83+
def event_count(self):
84+
return ray.get(self.actor.event_count.remote())
85+
8386
def _meta(self, action, *args, **kwargs):
8487
return ray.get(self.actor._meta.remote(action, *args, **kwargs))
8588

@@ -119,6 +122,7 @@ def __init__(self, name, operator=None):
119122
self._sinks = {}
120123
self._latest_sent_event_timestamp = None
121124
self._limit_subscribers = False
125+
self._event_counter = 0
122126

123127
def send_to(self, subscriber, name=None):
124128
if self._limit_subscribers:
@@ -142,6 +146,7 @@ def append(self, data):
142146
continue
143147
_eval(subscriber, data)
144148
self._latest_sent_event_timestamp = time.time()
149+
self._event_counter += 1
145150

146151
def add_operator(self, operator):
147152
self._operator = operator
@@ -195,6 +200,9 @@ def disconnect_all(self, stream_drain_timeout):
195200
for sink_name in dict(self._sinks):
196201
self.disconnect_sink(sink_name)
197202

203+
def event_count(self):
204+
return self.event_count
205+
198206
def _meta(self, action, *args, **kwargs):
199207
return verify_do(self, _global_camel, action, *args, **kwargs)
200208

rayvens/core/verify.py

+22-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515
#
1616

17+
import time
1718
from rayvens.core import kamel
1819

1920

@@ -23,7 +24,11 @@ def verify_do(stream, _global_camel, action, *args, **kwargs):
2324
raise RuntimeError('invalid meta action')
2425

2526

26-
def _verify_log(stream, _global_camel, sink_source_name, message):
27+
def _verify_log(stream,
28+
_global_camel,
29+
sink_source_name,
30+
message,
31+
wait_for_events=False):
2732
# Get integration:
2833
integration = None
2934
if sink_source_name in stream._sinks:
@@ -34,10 +39,25 @@ def _verify_log(stream, _global_camel, sink_source_name, message):
3439
raise RuntimeError(
3540
f'{sink_source_name} not found on stream {stream.name}')
3641

42+
log = "FAIL"
43+
44+
# Wait for at least one event to happen.
45+
if wait_for_events:
46+
event_count = 0
47+
countdown = 20
48+
while event_count == 0:
49+
event_count = stream.event_count()
50+
time.sleep(1)
51+
countdown -= 1
52+
if countdown == 0:
53+
break
54+
if event_count == 0:
55+
print("[LOG CHECK]:", log)
56+
return False
57+
3758
if _global_camel.mode.is_local():
3859
# In the local case the integration run is ongoing and we can
3960
# access the logs directly.
40-
# TODO: make this work for local implementation.
4161
outcome = integration.invocation.invoke(message)
4262
else:
4363
# When running using the operator then the integration run command
@@ -48,7 +68,6 @@ def _verify_log(stream, _global_camel, sink_source_name, message):
4868
outcome = invocation is not None
4969
invocation.kill()
5070

51-
log = "FAIL"
5271
if outcome:
5372
log = "SUCCESS"
5473
print("[LOG CHECK]:", log)

tests/generic_sink.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import ray
1818
import rayvens
19-
import time
2019
import os
2120

2221
# Initialize ray based on where ray will run inside the cluster using the
@@ -59,10 +58,8 @@
5958
output_message = f'Sending message to Slack sink in run mode {run_mode}.'
6059
stream << output_message
6160

62-
time.sleep(10)
63-
6461
# Verify outcome.
65-
stream._meta('verify_log', sink, output_message)
62+
stream._meta('verify_log', sink, output_message, wait_for_events=True)
6663

6764
# Delete all integrations from stream.
68-
stream.disconnect_all()
65+
stream.disconnect_all(after_idle_for=5)

tests/kafka_scaling_transport.py

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import os
18+
import sys
19+
import ray
20+
import rayvens
21+
22+
# Initialize run mode.
23+
if len(sys.argv) < 2:
24+
run_mode = 'local'
25+
else:
26+
run_mode = sys.argv[1]
27+
28+
if os.getenv('RAYVENS_TEST_MODE') == 'local':
29+
ray.init(object_store_memory=78643200)
30+
else:
31+
ray.init(address='auto')
32+
33+
# The Kafka topic used for communication.
34+
topic = "testTopic"
35+
36+
rayvens.init(mode=run_mode, transport='kafka')
37+
38+
# Create source stream and configuration.
39+
source_stream = rayvens.Stream('kafka-source-stream')
40+
41+
# Event sink config.
42+
test_sink_config = dict(kind='test-sink')
43+
44+
# Add sink to stream.
45+
test_sink = source_stream.add_sink(test_sink_config)
46+
47+
source_config = dict(
48+
kind='http-source',
49+
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
50+
route='/from-http',
51+
period=2000,
52+
kafka_transport_topic=topic,
53+
kafka_transport_partitions=3)
54+
source = source_stream.add_source(source_config)
55+
56+
# Verify outcome.
57+
source_stream._meta('verify_log',
58+
test_sink,
59+
"quoteResponse",
60+
wait_for_events=True)
61+
62+
# Disconnect source and sink.
63+
source_stream.disconnect_all(after=10)

tests/kafka_transport.py

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import os
18+
import sys
19+
import ray
20+
import rayvens
21+
22+
# Initialize run mode.
23+
if len(sys.argv) < 2:
24+
run_mode = 'local'
25+
else:
26+
run_mode = sys.argv[1]
27+
28+
if os.getenv('RAYVENS_TEST_MODE') == 'local':
29+
ray.init(object_store_memory=78643200)
30+
else:
31+
ray.init(address='auto')
32+
33+
# The Kafka topic used for communication.
34+
topic = "testTopic"
35+
36+
rayvens.init(mode=run_mode, transport='kafka')
37+
38+
# Create source stream and configuration.
39+
source_stream = rayvens.Stream('kafka-source-stream')
40+
41+
# Event sink config.
42+
test_sink_config = dict(kind='test-sink')
43+
44+
# Add sink to stream.
45+
test_sink = source_stream.add_sink(test_sink_config)
46+
47+
source_config = dict(
48+
kind='http-source',
49+
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
50+
route='/from-http',
51+
period=2000)
52+
source = source_stream.add_source(source_config)
53+
54+
# Verify outcome.
55+
source_stream._meta('verify_log',
56+
test_sink,
57+
"quoteResponse",
58+
wait_for_events=True)
59+
60+
# Disconnect source and sink.
61+
source_stream.disconnect_all(after=5)

tests/sink.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import ray
1818
import rayvens
19-
import time
2019
import os
2120

2221
# Initialize ray based on where ray will run inside the cluster using the
@@ -52,10 +51,8 @@
5251
output_message = f'Sending message to Slack sink in run mode {run_mode}.'
5352
stream << output_message
5453

55-
time.sleep(10)
56-
5754
# Verify outcome.
58-
stream._meta('verify_log', sink, output_message)
55+
stream._meta('verify_log', sink, output_message, wait_for_events=True)
5956

6057
# Delete all integrations from stream.
61-
stream.disconnect_all()
58+
stream.disconnect_all(after_idle_for=5)

0 commit comments

Comments
 (0)