Skip to content

LOGMLE - Fix argo metaflow retry integration #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ccdd76b
add custom log
dhpikolo Feb 11, 2025
0427c73
use echo_always
dhpikolo Feb 11, 2025
9ab0b1c
debug log to spot attempt number
dhpikolo Feb 12, 2025
8617d44
pass kwargs get_task_datastore()
dhpikolo Feb 12, 2025
cfc0624
use try-excpet to mitigate datastore error
dhpikolo Feb 12, 2025
47b8dd3
enable allow_not_done
dhpikolo Feb 12, 2025
f2126ec
remove try/except block
dhpikolo Feb 12, 2025
966ac66
print datastore metadata
dhpikolo Feb 12, 2025
3576162
use flow_datastore intead
dhpikolo Feb 12, 2025
326d847
fix attrs
dhpikolo Feb 12, 2025
f4d0acd
Update metaflow/cli_components/step_cmd.py
dhpikolo Feb 12, 2025
8f5044f
Update metaflow/cli_components/step_cmd.py
dhpikolo Feb 12, 2025
f1aa3a6
add try except block
dhpikolo Feb 13, 2025
cc2a622
correct math
dhpikolo Feb 13, 2025
bba6504
do not use ca_store attrs
dhpikolo Feb 13, 2025
5603de0
improve logs
dhpikolo Feb 13, 2025
e605958
use getattr
dhpikolo Feb 13, 2025
f2829d2
set allow_not_done = True
dhpikolo Feb 13, 2025
5f7d50d
use taskdatastores instead
dhpikolo Feb 14, 2025
8fc9b37
add logs
dhpikolo Feb 14, 2025
ca8736f
Update metaflow/cli_components/step_cmd.py
dhpikolo Feb 14, 2025
98b7579
save task std logs
dhpikolo Feb 14, 2025
250b1ef
Merge branch 'logmle-debug-step-cli' of https://github.com/dhpikolo/m…
dhpikolo Feb 14, 2025
7f89598
Revert "save task std logs"
dhpikolo Feb 14, 2025
44e05cc
refactor + infer attempt in mflog.save_logs
dhpikolo Feb 14, 2025
b5bb050
change default to 0
dhpikolo Feb 17, 2025
a405118
Merge remote-tracking branch 'origin' into logmle-debug-step-cli
dhpikolo Feb 17, 2025
b69f81d
remove casting to int
dhpikolo Feb 17, 2025
10ec80f
set default to None
dhpikolo Feb 17, 2025
8959ae5
move get latest done attempts before if-else
dhpikolo Feb 18, 2025
1277051
revert max_user_code_retries setting - since tests fail
dhpikolo Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions metaflow/cli_components/step_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ def step(
retry_count,
)
else:
latest_done_attempt = task.flow_datastore.get_latest_done_attempt(
run_id=run_id,
step_name=step_name,
task_id=task_id
)
retry_count = latest_done_attempt + 1
echo_always(f"{latest_done_attempt=}")
echo_always(f"{retry_count=}")
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove it after testing complete

# Not sure what are the side effects to this.
if retry_count >= max_user_code_retries:
max_user_code_retries = retry_count
task.run_step(
step_name,
run_id,
Expand Down
7 changes: 7 additions & 0 deletions metaflow/datastore/flow_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ def __init__(
def datastore_root(self):
return self._storage_impl.datastore_root

def get_latest_done_attempt(self, run_id, step_name, task_id) -> int:
t_datastores = self.get_task_datastores(
pathspecs=[f"{run_id}/{step_name}/{task_id}"],
include_prior=True
)
return max([t.attempt for t in t_datastores], default=-1) # default=-1, when no successful done_attempts found.

def get_task_datastores(
self,
run_id=None,
Expand Down
5 changes: 3 additions & 2 deletions metaflow/mflog/save_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def _read_file(path):

# these env vars are set by mflog.mflog_env
pathspec = os.environ["MF_PATHSPEC"]
attempt = os.environ["MF_ATTEMPT"]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this come from? I am wondering if we can leave all the code in this file alone and try to set the MF_ATTEMPT with the value that we want.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MF_ATTEMPT comes from the argo workflow template container command.

Screenshot 2025-02-17 at 14 42 28

ds_type = os.environ["MF_DATASTORE"]
ds_root = os.environ.get("MF_DATASTORE_ROOT")
paths = (os.environ["MFLOG_STDOUT"], os.environ["MFLOG_STDERR"])
Expand All @@ -37,8 +36,10 @@ def print_clean(line, **kwargs):
flow_datastore = FlowDataStore(
flow_name, None, storage_impl=storage_impl, ds_root=ds_root
)
# Use inferred attempt - to save task_stdout.log and task_stderr.log
latest_done_attempt = flow_datastore.get_latest_done_attempt(run_id=run_id, step_name=step_name, task_id=task_id)
task_datastore = flow_datastore.get_task_datastore(
run_id, step_name, task_id, int(attempt), mode="w"
run_id, step_name, task_id, int(latest_done_attempt), mode="w"
Copy link
Owner Author

@dhpikolo dhpikolo Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed because, both kubernetes and argo options run mflog.save_logs after the task is completed. which will upload task_stdout, task_stderr logs to s3 and reflects on the metaflow UI.

Example run without this change, have no logs for argo retried attempts. ie Attempt 3, 4 & 5

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we explicitly casting this to an int? Why don't we do this in the function?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are right, i dont think we need to cast it anymore, the function returns a count.

I had it this way to keep the changes minimal.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function returning None integrates well here, since get_task_data_store(..., attempt=None) will not be broken.

https://github.com/Netflix/metaflow/blob/5c960eaff1ae486f503b37177f03cc1419b5571d/metaflow/datastore/flow_datastore.py#L206

)

try:
Expand Down