Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LOGMLE - Fix argo metaflow retry integration #1

Open
wants to merge 31 commits into
base: master
Choose a base branch
from

Conversation

dhpikolo
Copy link
Owner

@dhpikolo dhpikolo commented Feb 11, 2025

Problem

https://github.com/deliveryhero/logistics-ds-metaflow-ext/issues/108#issue-2832910761

Solution

This PR,modifies the step cli command to infer retry_count from flow_datastore class. Looks like this class, holds all the information about underlying datastore and run artifiacts.

  • Adds a method that infers latest done attempt of a task
  • Remove MF_ATTEMPT from mflog.save_logs python script.

New Behaviour

Previously argo retry --node <> would overwrite previously run attepmts, Now it would add artifacts as it was new attempt.

Fixes: https://github.com/deliveryhero/logistics-ds-metaflow-ext/issues/108

Slack Discussion: https://outerbounds-community.slack.com/archives/C020U025QJK/p1737483912784969

Testing and Limitations

Use this branch on dummy model to test this:

Validation & QA

Documentation:

@dhpikolo dhpikolo marked this pull request as draft February 11, 2025 18:36
Comment on lines 171 to 172
echo_always(f"{latest_done_attempt=}")
echo_always(f"{retry_count=}")
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove it after testing complete

Comment on lines 39 to 42
# Use inferred attempt - to save task_stdout.log and task_stderr.log
latest_done_attempt = flow_datastore.get_latest_done_attempt(run_id=run_id, step_name=step_name, task_id=task_id)
task_datastore = flow_datastore.get_task_datastore(
run_id, step_name, task_id, int(attempt), mode="w"
run_id, step_name, task_id, int(latest_done_attempt), mode="w"
Copy link
Owner Author

@dhpikolo dhpikolo Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed because, both kubernetes and argo options run mflog.save_logs after the task is completed. which will upload task_stdout, task_stderr logs to s3 and reflects on the metaflow UI.

Example run without this change, have no logs for argo retried attempts. ie Attempt 3, 4 & 5

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we explicitly casting this to an int? Why don't we do this in the function?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are right, i dont think we need to cast it anymore, the function returns a count.

I had it this way to keep the changes minimal.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function returning None integrates well here, since get_task_data_store(..., attempt=None) will not be broken.

https://github.com/Netflix/metaflow/blob/5c960eaff1ae486f503b37177f03cc1419b5571d/metaflow/datastore/flow_datastore.py#L206

@dhpikolo dhpikolo self-assigned this Feb 17, 2025
@dhpikolo dhpikolo marked this pull request as ready for review February 17, 2025 11:03
@dhpikolo dhpikolo changed the title LOGMLE - Inspect step cli command LOGMLE - Fix argo metaflow retry integration Feb 17, 2025
@@ -21,7 +21,6 @@ def _read_file(path):

# these env vars are set by mflog.mflog_env
pathspec = os.environ["MF_PATHSPEC"]
attempt = os.environ["MF_ATTEMPT"]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this come from? I am wondering if we can leave all the code in this file alone and try to set the MF_ATTEMPT with the value that we want.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MF_ATTEMPT comes from the argo workflow template container command.

Screenshot 2025-02-17 at 14 42 28

@dhpollack
Copy link

Ok, one last "idea" because I feel like I'm being negative, but don't want to be.

What if instead of creating more retries, we simply set the attempt number to the last available attempt and overwrite all the data on this attempt?

Pros:

  • existing code should work as expected
  • won't increase the max attempt number

Cons:

  • will overwrite the data of the last attempt (which presumably failed so you don't need it anyway)
  • there will only be one attempted retry because the next attempt would hit the maximum.

We might even be able to do this in the operator by rewriting the command when we see the annotation that the workflow is a retry.

Even if we try this, we can still have this as a stop gap solution.

@dhpikolo
Copy link
Owner Author

dhpikolo commented Feb 17, 2025

I kind of looked into this idea already.

one of the difficulty was:

we see the annotation that the workflow is a retry.

there is no annotation to spot if a workflow is retried.

@dhpikolo
Copy link
Owner Author

dhpikolo commented Feb 17, 2025

but you just gave me a hint 😄

if we can change {{retries}} --> {{retries}} + {{limit}}, we could actually make argo retry button work for once.

Screenshot 2025-02-17 at 14 57 44

EDIT: aah no sorry. i was wrong

@dhpikolo
Copy link
Owner Author

#1 (comment)

{{retries}} --> {{retries}} + if this_is_retried_flow {{limit}} else 0

@colebaileygit
Copy link

will overwrite the data of the last attempt (which presumably failed so you don't need it anyway)
I think we do want to keep data so that we can debug why it failed, but if it failed multiple times overwriting just 1 attempt isn't the end of the world.

Since one of the metaflow maintainers' main concerns is breaking "immutability" so I think overwriting data would stop this from getting merged upstream. The other concern is making changes to the core task logic which has a risk of breaking other systems.

pathspecs=[f"{run_id}/{step_name}/{task_id}"],
include_prior=True
)
return max([t.attempt for t in t_datastores], default=0) # returns default, if this was a first attempt.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the default be -1 or None?

Copy link
Owner Author

@dhpikolo dhpikolo Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will make the default None since, it makes more sense when no done attempts were found.

Also: #1 (comment)

@dhpikolo
Copy link
Owner Author

dhpikolo commented Feb 18, 2025

Tracking discussion with metaflow maintainers in a separate PR

# Not sure what are the side effects to this.
if retry_count >= max_user_code_retries:
max_user_code_retries = retry_count

Copy link
Owner Author

@dhpikolo dhpikolo Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, @colebaileygit

MAX_ATTEMPTS vs max_user_code_retries

i think, max_user_code_retries is number of times a task can be retried. this value is same as the value that the retry decorator contains.

sample workflow-template

Screenshot 2025-02-18 at 15 19 09

Screenshot 2025-02-18 at 15 19 26

Where as MAX_ATTEMPTS is number of total attempts that a task can be ran. [here], which is basically the number of retries + initial run

@dhpikolo
Copy link
Owner Author

dhpikolo commented Feb 18, 2025

Testing

Hitting argo retry button on the failed argo-workflow

In [1]: from metaflow import Step, current, namespace, Task

In [2]: namespace(None)

In [3]: t = Task("DummyProjectFlow/argo-dummyproject.user.j.kollipara.dummyprojectflow-z6mfl/train_model/t-8586c205")

In [4]: t.data
Out[4]: <MetaflowData: country_code, test_data, train_data, countries, region, country_codes, execution_date, regions, rev_map_regions, skip_create_table, table_run_id, auto_deploy, buffer_ratio, build_static_image, ccs, code_version_override, components, dump_model, ed, merge_strategy, name, register_model, stamps, tri>

In [5]: t.data.test_data
Out[5]: 
   country_code        date  orders
53           at  2025-02-10   27967
54           at  2025-02-11   28230
55           at  2025-02-12   26754
56           at  2025-02-13   30494
57           at  2025-02-14   36382
58           at  2025-02-15   30812
59           at  2025-02-16   38858

In [6]: t.data.country_code
Out[6]: 'at'

In [7]: t.current_attempt
Out[7]: 5

Comment on lines 161 to 163
if latest_done_attempt:
retry_count = latest_done_attempt + 1
# Not sure what are the side effects to this.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Infer the retry count only if previously done attempts were detected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants