Skip to content

[Working POC] Working commit for spin #2335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open

[Working POC] Working commit for spin #2335

wants to merge 2 commits into from

Conversation

talsperre
Copy link
Collaborator

This is a working version of spin. To use it simply run the following command:

python <my_flow.py> spin step_m --spin-pathspec <Full Task Pathspec>

You might have to run the flow once with this new commit, before being able to spin individual steps.

For instance,

python runtime_dag_flow.py spin step_m --spin-pathspec RuntimeDAGFlow/13/step_m/275233901

try:
spec = importlib.util.spec_from_file_location("artifacts_module", file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like if there's arbitrary python code in the file, it will be executed here. Do we need to worry about that?

"--spin-pathspec",
default=None,
show_default=True,
help="Task ID to use when spinning up the step. The spun step will use the artifacts"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task id -> pathspec?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add space so lines don't get muched up.

help="Change namespace from the default (your username) to the specified tag.",
)
@click.option(
"--skip-decorators/--no-skip-decorators",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe expose the decorator whitelist instead?

module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
variables = vars(module)
return variables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't you check that ARTIFACTS exists here?

else:
# (Current task, "A:10,B:13,C:21") and (Parent task, "A:10,B:13")
# (Current task, "A:10,B:13") and (Parent task, "A:10,B:13")
if parent_step_type == "split-foreach":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move these strings into constants?

# Spin configuration
###
SPIN_ALLOWED_DECORATORS = from_conf(
"SPIN_ALLOWED_DECORATORS", ["conda", "pypi", "environment", "titus"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

titus probably shouldn't be in this repo

@@ -306,6 +385,40 @@ def __enter__(self) -> "Runner":
async def __aenter__(self) -> "Runner":
return self

def __get_executing_task(self, attribute_file_fd, command_obj):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type hints?

Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments.

ctx.obj.spin_metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
# ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this commented out. It does seem like they are set above.

@@ -470,14 +505,14 @@ def start(
ctx.obj.flow,
ctx.obj.graph,
ctx.obj.environment,
ctx.obj.flow_datastore,
ctx.obj.metadata,
ctx.obj.flow_datastore if not ctx.obj.is_spin else ctx.obj.spin_flow_datastore,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a lot of these changes. I wonder if we could do them once and say something like: ctx.obj.effective_flow_datastore and avoid the if/else all over. It may make it more maintainable going forward as well?

"--spin-pathspec",
default=None,
show_default=True,
help="Task ID to use when spinning up the step. The spun step will use the artifacts"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add space so lines don't get muched up.

show_default=True,
help="Path to a module that contains artifacts to be used in the spun step. The artifacts should "
"be defined as a dictionary called ARTIFACTS with keys as the artifact names and values as the "
"artifact values. The artifact values will overwrite the default values of the artifacts used in "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say "override" and not "overwrite"



@click.command(help="Spins up a task for a given step from a previous run locally.")
@click.option(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this an argument. It feels maybe a little more natural to say: myflow.py spin my_step than myflow.py spin --step-name my_step It would also simplify things a bit as these are either/or option with pathspec.

Parent task of the current task

str
Pathspec of the parent task of the current task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/the/a/

def __setitem__(self, name, value):
self._data[name] = value

def __getattr__(self, name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can do SID.bar for reading just like SID["bar"] but we can only set using SID["bar"]?

If the timeout is reached, the run is terminated. If not specified, wait
forever.
The maximum time to wait for the run to finish.
If the timeout is reached, the run is terminated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IT's not always arun right? It could be executingtask?

@@ -173,7 +175,7 @@ async def stream_log(
----------
stream : str
The stream to stream logs from. Can be one of `stdout` or `stderr`.
position : int, optional, default None
position : Optional[int], default None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax is: int, optional, deffault None

"""
Blocking spin execution of the run.
This method will wait until the spun run has completed execution.
Parameters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty line (same below)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants