Skip to content

Commit 429cf98

Browse files
authored
Use readiness probe in local mode. (#50)
1 parent 3a9f395 commit 429cf98

File tree

2 files changed

+19
-16
lines changed

2 files changed

+19
-16
lines changed

rayvens/core/common.py

+14-15
Original file line numberDiff line numberDiff line change
@@ -83,31 +83,30 @@ def _wait_for_ready_integration(mode, integration):
8383
# Wait for an integration to reach its running state and not only that but
8484
# also be in a state where it can immediately execute incoming requests.
8585
def await_start(mode, integration):
86-
# Only needed when operator is used.
87-
if mode.is_local():
88-
return True
89-
9086
# Check logs of the integration to make sure it was installed properly.
91-
invocation = kamel.log(mode, integration.integration_name,
92-
"Installed features:")
93-
integration_is_running = invocation is not None
94-
if integration_is_running:
95-
print(f'Integration {integration.integration_name} is running.')
96-
else:
97-
print('Integration did not start correctly.')
87+
if not mode.is_local():
88+
invocation = kamel.log(mode, integration.integration_name,
89+
"Installed features:")
90+
integration_is_running = invocation is not None
91+
if integration_is_running:
92+
print(f'Integration {integration.integration_name} is running.')
93+
else:
94+
print('Integration did not start correctly.')
9895

9996
# For kafka transport the health check cannot be performed.
10097
if mode.transport == 'kafka':
10198
return True
10299

103-
# Perform health check and wait for integration to be ready.
100+
# Perform readiness check and wait for integration to be ready.
104101
healthy_integration = _wait_for_ready_integration(mode, integration)
105102

106103
if healthy_integration:
107-
print(f'Integration {integration.integration_name} is healthy.')
108-
return integration_is_running
104+
print(f'Integration {integration.integration_name} is ready.')
105+
if not mode.is_local():
106+
return integration_is_running
107+
return True
109108

110-
print(f'Integration {integration.integration_name} is not healthy.')
109+
print(f'Integration {integration.integration_name} cannot be ready.')
111110
return False
112111

113112

rayvens/core/local.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import rayvens.core.catalog as catalog
1818
from rayvens.core.integration import Integration
19-
from rayvens.core.common import get_run_mode, send_to, recv_from
19+
from rayvens.core.common import get_run_mode, send_to, recv_from, await_start
2020

2121

2222
def start(camel_mode, check_port):
@@ -37,6 +37,8 @@ def add_source(self, stream, config, source_name):
3737
integration.prepare_environment(self.mode)
3838
integration.invoke_local_run(self.mode, spec)
3939
send_to(stream.actor, self.mode.server_address(integration), route)
40+
if not await_start(self.mode, integration):
41+
raise RuntimeError('Could not start source')
4042
return integration
4143

4244
def add_sink(self, stream, config, sink_name):
@@ -47,6 +49,8 @@ def add_sink(self, stream, config, sink_name):
4749
integration.invoke_local_run(self.mode, spec)
4850
recv_from(stream.actor, sink_name,
4951
self.mode.server_address(integration), route)
52+
if not await_start(self.mode, integration):
53+
raise RuntimeError('Could not start source')
5054
return integration
5155

5256
def disconnect(self, integration):

0 commit comments

Comments
 (0)