Skip to content

Commit 6d68b65

Browse files
authored
Fix tests (#63)
* Fix tests. * Revert parameter.
1 parent 8d9151f commit 6d68b65

File tree

7 files changed

+62
-42
lines changed

7 files changed

+62
-42
lines changed

rayvens/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# limitations under the License.
1515
#
1616

17-
from rayvens.api import init, Stream
17+
from rayvens.api import init, meta, Stream
1818
from rayvens.core.common import OutputEvent
1919

20-
__all__ = ['init', 'Stream', 'OutputEvent']
20+
__all__ = ['init', 'meta', 'Stream', 'OutputEvent']

rayvens/api.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,22 @@ def flush_batch(self):
268268
self.context.latest_sent_event_timestamp = time.time()
269269
self.context.event_counter += 1
270270

271-
def _meta(self, action, *args, **kwargs):
272-
return verify_do(self, _global_camel, action, *args, **kwargs)
271+
def _get_integration(self, source_sink_name):
272+
if source_sink_name in self._sinks:
273+
integration = self._sinks[source_sink_name]
274+
if source_sink_name in self._sources:
275+
integration = self._sources[source_sink_name]
276+
if integration is None:
277+
raise RuntimeError(
278+
f'{source_sink_name} not found on stream {self.name}')
279+
return integration
280+
281+
def _integration_invoke(self, source_sink_name, message):
282+
integration = self._get_integration(source_sink_name)
283+
return integration.invocation.invoke(message)
284+
285+
def _get_integration_name(self, source_sink_name):
286+
return self._get_integration(source_sink_name).integration_name
273287

274288
def _get_latest_timestamp(self):
275289
return self.context.latest_sent_event_timestamp
@@ -341,3 +355,7 @@ def init(mode=os.getenv('RAYVENS_MODE', 'auto'),
341355
f'{transport} transport unsupported for mode {mode}.')
342356
else:
343357
raise RuntimeError(f'Unsupported mode {mode}.')
358+
359+
360+
def meta(stream, action, *args, **kwargs):
361+
return verify_do(stream.actor, _global_camel, action, *args, **kwargs)

rayvens/core/verify.py

+25-26
Original file line numberDiff line numberDiff line change
@@ -14,57 +14,56 @@
1414
# limitations under the License.
1515
#
1616

17+
import ray
1718
import time
1819
from rayvens.core import kamel
1920

2021

21-
def verify_do(stream, _global_camel, action, *args, **kwargs):
22+
def verify_do(handle, _global_camel, action, *args, **kwargs):
2223
if action == 'verify_log':
23-
return _verify_log(stream, _global_camel, *args, **kwargs)
24+
return _verify_log(handle, _global_camel, *args, **kwargs)
2425
raise RuntimeError('invalid meta action')
2526

2627

27-
def _verify_log(stream,
28+
def wait_for_event(handle):
29+
event_count = 0
30+
countdown = 20
31+
while event_count == 0:
32+
event_count = ray.get(handle.event_count.remote())
33+
time.sleep(1)
34+
countdown -= 1
35+
if countdown == 0:
36+
break
37+
if event_count == 0:
38+
return False
39+
return True
40+
41+
42+
def _verify_log(handle,
2843
_global_camel,
2944
sink_source_name,
3045
message,
3146
wait_for_events=False):
32-
# Get integration:
33-
integration = None
34-
if sink_source_name in stream._sinks:
35-
integration = stream._sinks[sink_source_name]
36-
if sink_source_name in stream._sources:
37-
integration = stream._sources[sink_source_name]
38-
if integration is None:
39-
raise RuntimeError(
40-
f'{sink_source_name} not found on stream {stream.name}')
41-
4247
log = "FAIL"
4348

4449
# Wait for at least one event to happen.
4550
if wait_for_events:
46-
event_count = 0
47-
countdown = 20
48-
while event_count == 0:
49-
event_count = stream.event_count()
50-
time.sleep(1)
51-
countdown -= 1
52-
if countdown == 0:
53-
break
54-
if event_count == 0:
55-
print("[LOG CHECK]:", log)
51+
if not wait_for_event(handle):
52+
print("[LOG CHECK]:", "NO EVENTS RECEIVED")
5653
return False
5754

5855
if _global_camel.mode.is_local():
5956
# In the local case the integration run is ongoing and we can
6057
# access the logs directly.
61-
outcome = integration.invocation.invoke(message)
58+
outcome = ray.get(
59+
handle._integration_invoke.remote(sink_source_name, message))
6260
else:
6361
# When running using the operator then the integration run command
6462
# is non-blocking and returns immediately. The logs can be queried
6563
# using the kamel log command.
66-
invocation = kamel.log(_global_camel.mode,
67-
integration.integration_name, message)
64+
integration_name = ray.get(
65+
handle._get_integration_name.remote(sink_source_name))
66+
invocation = kamel.log(_global_camel.mode, integration_name, message)
6867
outcome = invocation is not None
6968
invocation.kill()
7069

tests/generic_sink.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
stream << output_message
6060

6161
# Verify outcome.
62-
stream._meta('verify_log', sink, output_message, wait_for_events=True)
62+
rayvens.meta(stream, 'verify_log', sink, output_message, wait_for_events=True)
6363

6464
# Delete all integrations from stream.
6565
stream.disconnect_all(after_idle_for=5)

tests/kafka_scaling_transport.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
ray.init(address='auto')
3232

3333
# The Kafka topic used for communication.
34-
topic = "testTopic"
34+
topic = "testTopicPartitioned"
3535

3636
rayvens.init(mode=run_mode, transport='kafka')
3737

@@ -53,11 +53,13 @@
5353
kafka_transport_partitions=3)
5454
source = source_stream.add_source(source_config)
5555

56-
# Verify outcome.
57-
source_stream._meta('verify_log',
58-
test_sink,
59-
"quoteResponse",
60-
wait_for_events=True)
56+
# Verify outcome. Since events are going through the Kafka infrastructure
57+
# we need to disable checks based on event counts.
58+
rayvens.meta(source_stream,
59+
'verify_log',
60+
test_sink,
61+
"quoteResponse",
62+
wait_for_events=False)
6163

6264
# Disconnect source and sink.
6365
source_stream.disconnect_all(after=10)

tests/kafka_transport.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@
5252
source = source_stream.add_source(source_config)
5353

5454
# Verify outcome.
55-
source_stream._meta('verify_log',
56-
test_sink,
57-
"quoteResponse",
58-
wait_for_events=True)
55+
rayvens.meta(source_stream,
56+
'verify_log',
57+
test_sink,
58+
"quoteResponse",
59+
wait_for_events=True)
5960

6061
# Disconnect source and sink.
6162
source_stream.disconnect_all(after=5)

tests/sink.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
stream << output_message
5353

5454
# Verify outcome.
55-
stream._meta('verify_log', sink, output_message, wait_for_events=True)
55+
rayvens.meta(stream, 'verify_log', sink, output_message, wait_for_events=True)
5656

5757
# Delete all integrations from stream.
5858
stream.disconnect_all(after_idle_for=5)

0 commit comments

Comments
 (0)