Skip to content

Commit 4123178

Browse files
committed
- a catalog dir instead of catalog files for artifacts
- a better solution for subpath slices Signed-off-by: zjgemi <[email protected]>
1 parent 44f3ac5 commit 4123178

File tree

10 files changed

+116
-253
lines changed

10 files changed

+116
-253
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# CHANGELOG
22

3+
## 1.1.15
4+
5+
- a catalog dir instead of catalog files for artifacts
6+
- a better solution for subpath slices
7+
38
## 1.1.14
49

510
- add save_path_as_artifact to handle large path lists

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.14
1+
1.1.15

examples/test_subpath_slices.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import time
22
from typing import List
33

4-
from dflow import Step, Workflow, config # , upload_artifact
4+
from dflow import Step, Workflow # , upload_artifact
55
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
66
Slices, upload_packages)
77

8-
config["save_path_as_parameter"] = True
98
if "__file__" in locals():
109
upload_packages.append(__file__)
1110

src/dflow/common.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ def __init__(
2525
config = Configuration()
2626
config.client_side_validation = False
2727
super().__init__(local_vars_configuration=config, *args, **kwargs)
28-
self._sub_path = None
2928
if path_list is None:
3029
path_list = []
3130
self.path_list = path_list
@@ -35,5 +34,7 @@ def sub_path(
3534
path: str,
3635
) -> Any:
3736
artifact = deepcopy(self)
38-
artifact._sub_path = path
37+
if artifact.key[-1:] != "/":
38+
artifact.key += "/"
39+
artifact.key += path
3940
return artifact

src/dflow/config.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"k8s_api_server": None,
66
"private_key_host_path": "/home/docker/.ssh",
77
"save_path_as_parameter": False,
8-
"save_path_as_artifact": False,
9-
"catalog_file_name": ".dflow",
10-
"archive_mode": "tar"
8+
"catalog_dir_name": ".dflow",
9+
"archive_mode": "tar",
1110
}

src/dflow/io.py

Lines changed: 29 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -54,37 +54,22 @@ class InputArtifacts(AutonamedDict):
5454
def __setitem__(self, key, value):
5555
assert isinstance(value, InputArtifact)
5656
super().__setitem__(key, value)
57-
if (config["save_path_as_parameter"] or
58-
config["save_path_as_artifact"]) and self.template is not None:
57+
if config["save_path_as_parameter"] and self.template is not None:
5958
if isinstance(value.source, S3Artifact):
60-
self.template.inputs.parameters[
61-
"dflow_%s_path_list" % key] = InputParameter(
62-
value=value.source.path_list,
63-
save_as_artifact=config["save_path_as_artifact"],
64-
optional=True, save_type=False)
65-
elif config["save_path_as_artifact"]:
66-
self.template.inputs.parameters[
67-
"dflow_%s_path_list" % key] = InputParameter(
68-
optional=True, save_as_artifact=True)
59+
self.template.inputs.parameters["dflow_%s_path_list" % key] = \
60+
InputParameter(value=value.source.path_list)
6961
else:
70-
self.template.inputs.parameters[
71-
"dflow_%s_path_list" % key] = InputParameter(
72-
value=[])
62+
self.template.inputs.parameters["dflow_%s_path_list" % key] = \
63+
InputParameter(value=[])
7364

7465
def set_template(self, template):
7566
super().set_template(template)
76-
if config["save_path_as_parameter"] or config["save_path_as_artifact"]:
67+
if config["save_path_as_parameter"]:
7768
for name, art in self.items():
7869
if isinstance(art.source, S3Artifact):
7970
self.template.inputs.parameters["dflow_%s_path_list"
8071
% name] = InputParameter(
81-
value=art.source.path_list,
82-
save_as_artifact=config["save_path_as_artifact"],
83-
optional=True, save_type=False)
84-
elif config["save_path_as_artifact"]:
85-
self.template.inputs.parameters["dflow_%s_path_list"
86-
% name] = InputParameter(
87-
optional=True, save_as_artifact=True)
72+
value=art.source.path_list)
8873
else:
8974
self.template.inputs.parameters["dflow_%s_path_list"
9075
% name] = InputParameter(
@@ -101,22 +86,17 @@ class OutputArtifacts(AutonamedDict):
10186
def __setitem__(self, key, value):
10287
assert isinstance(value, OutputArtifact)
10388
super().__setitem__(key, value)
104-
if (config["save_path_as_parameter"] or
105-
config["save_path_as_artifact"]) and self.template is not None:
89+
if config["save_path_as_parameter"] and self.template is not None:
10690
self.template.outputs.parameters["dflow_%s_path_list" % key] = \
107-
OutputParameter(
108-
value=[], optional=True,
109-
save_as_artifact=config["save_path_as_artifact"])
91+
OutputParameter(value=[])
11092
value.handle_path_list()
11193

11294
def set_template(self, template):
11395
super().set_template(template)
114-
if config["save_path_as_parameter"] or config["save_path_as_artifact"]:
96+
if config["save_path_as_parameter"]:
11597
for name, art in self.items():
11698
self.template.outputs.parameters["dflow_%s_path_list" % name]\
117-
= OutputParameter(
118-
value=[], optional=True,
119-
save_as_artifact=config["save_path_as_artifact"])
99+
= OutputParameter(value=[])
120100
art.handle_path_list()
121101

122102

@@ -246,8 +226,6 @@ def __init__(
246226
path: str = None,
247227
source: Union["InputArtifact",
248228
"OutputArtifact", S3Artifact] = None,
249-
optional: bool = None,
250-
save_type: bool = True,
251229
**kwargs,
252230
) -> None:
253231
self.name = name
@@ -259,8 +237,6 @@ def __init__(
259237
self.save_as_artifact = save_as_artifact
260238
self.path = path
261239
self.source = source
262-
self.optional = optional
263-
self.save_type = save_type
264240

265241
def __getattr__(self, key):
266242
if key == "expr":
@@ -323,8 +299,7 @@ def convert_to_argo(self):
323299
InputArtifact, OutputArtifact)):
324300
return V1alpha1Artifact(name="dflow_bigpar_" + self.name,
325301
path=self.path,
326-
_from=str(self.value),
327-
optional=self.optional)
302+
_from=str(self.value))
328303
else:
329304
with tempfile.TemporaryDirectory() as tmpdir:
330305
content = {}
@@ -336,23 +311,18 @@ def convert_to_argo(self):
336311
content["type"] = str(self.type)
337312
path = tmpdir + "/" + self.name
338313
with open(path, "w") as f:
339-
if self.save_type:
340-
f.write(jsonpickle.dumps(content))
341-
else:
342-
f.write(content["value"])
314+
f.write(jsonpickle.dumps(content))
343315
key = upload_s3(path)
344316
s3 = S3Artifact(key=key)
345317
return V1alpha1Artifact(name="dflow_bigpar_" + self.name,
346-
path=self.path, s3=s3,
347-
optional=self.optional)
318+
path=self.path, s3=s3)
348319
elif isinstance(self.source, (InputParameter, OutputParameter,
349320
InputArtifact, OutputArtifact)):
350321
return V1alpha1Artifact(name="dflow_bigpar_" + self.name,
351-
path=self.path, _from=str(self.source),
352-
optional=self.optional)
322+
path=self.path, _from=str(self.source))
353323
else:
354324
return V1alpha1Artifact(name="dflow_bigpar_" + self.name,
355-
path=self.path, optional=self.optional)
325+
path=self.path)
356326

357327
if not hasattr(self, "value"):
358328
return V1alpha1Parameter(name=self.name, description=description)
@@ -456,11 +426,9 @@ def convert_to_argo(self):
456426
_from=str(self.source), sub_path=sub_path,
457427
mode=self.mode)
458428
elif isinstance(self.source, S3Artifact):
459-
sub_path = self.sub_path if self.sub_path is not None else \
460-
self.source._sub_path
461429
return V1alpha1Artifact(name=self.name, path=self.path,
462430
optional=self.optional, s3=self.source,
463-
sub_path=sub_path, mode=self.mode)
431+
sub_path=self.sub_path, mode=self.mode)
464432
elif isinstance(self.source, str):
465433
return V1alpha1Artifact(name=self.name, path=self.path,
466434
optional=self.optional,
@@ -499,8 +467,6 @@ def __init__(
499467
global_name: str = None,
500468
value_from_expression: Union[str, IfExpression] = None,
501469
save_as_artifact: bool = False,
502-
optional: bool = None,
503-
save: List[Union[PVC, S3Artifact]] = None,
504470
**kwargs,
505471
) -> None:
506472
self.value_from_path = value_from_path
@@ -516,18 +482,9 @@ def __init__(
516482
self.default = kwargs["default"]
517483
if "value" in kwargs:
518484
self.value = kwargs["value"]
519-
self.optional = optional
520-
if save is None:
521-
save = []
522-
elif not isinstance(save, list):
523-
save = [save]
524-
self.save = save
525-
self.redirect = None
526485

527486
def __getattr__(self, key):
528487
if key == "expr":
529-
if self.redirect is not None:
530-
return self.redirect.expr
531488
if self.save_as_artifact:
532489
if self.name is not None:
533490
if self.step is not None:
@@ -566,8 +523,6 @@ def __setattr__(self, key, value):
566523
return super().__setattr__(key, value)
567524

568525
def __repr__(self):
569-
if self.redirect is not None:
570-
return str(self.redirect)
571526
if self.save_as_artifact:
572527
if self.name is not None:
573528
if self.step is not None:
@@ -597,35 +552,24 @@ def convert_to_argo(self):
597552
description = jsonpickle.dumps({"type": str(self.type)})
598553

599554
if self.save_as_artifact:
600-
s3 = None
601-
for save in self.save:
602-
if isinstance(save, S3Artifact):
603-
s3 = deepcopy(save)
604-
if s3._sub_path is not None:
605-
if s3.key[-1] != "/":
606-
s3.key += "/"
607-
s3.key += s3._sub_path
608-
609555
if self.value_from_path is not None:
610556
return V1alpha1Artifact(
611557
name="dflow_bigpar_" + self.name,
612-
path=self.value_from_path, global_name=self.global_name,
613-
s3=s3, archive=V1alpha1ArchiveStrategy(_none={}),
614-
optional=self.optional)
558+
path=self.value_from_path,
559+
archive=V1alpha1ArchiveStrategy(_none={}),
560+
global_name=self.global_name)
615561
elif self.value_from_parameter is not None:
616562
return V1alpha1Artifact(
617563
name="dflow_bigpar_" + self.name,
618564
_from=str(self.value_from_parameter),
619-
global_name=self.global_name,
620-
s3=s3, archive=V1alpha1ArchiveStrategy(_none={}),
621-
optional=self.optional)
565+
archive=V1alpha1ArchiveStrategy(_none={}),
566+
global_name=self.global_name)
622567
elif self.value_from_expression is not None:
623568
return V1alpha1Artifact(
624569
name="dflow_bigpar_" + self.name,
625570
from_expression=str(self.value_from_expression),
626-
global_name=self.global_name, s3=s3,
627571
archive=V1alpha1ArchiveStrategy(_none={}),
628-
optional=self.optional)
572+
global_name=self.global_name)
629573
else:
630574
raise RuntimeError("Not supported.")
631575

@@ -682,7 +626,6 @@ class OutputArtifact(ArgoVar):
682626
archive: compress format of the artifact, None for no compression
683627
global_name: global name of the artifact within the workflow
684628
from_expression: the artifact is from an expression
685-
optional: optional artifact or not
686629
"""
687630

688631
def __init__(
@@ -697,7 +640,6 @@ def __init__(
697640
archive: str = "default",
698641
global_name: str = None,
699642
from_expression: Union[IfExpression, str] = None,
700-
optional: bool = None,
701643
**kwargs,
702644
) -> None:
703645
self.path = path
@@ -718,7 +660,6 @@ def __init__(
718660
self.from_expression = from_expression
719661
self.redirect = None
720662
self._from = _from
721-
self.optional = optional
722663

723664
def sub_path(self, path):
724665
artifact = deepcopy(self)
@@ -745,9 +686,8 @@ def __getattr__(self, key):
745686

746687
def __setattr__(self, key, value):
747688
super().__setattr__(key, value)
748-
if (config["save_path_as_parameter"] or
749-
config["save_path_as_artifact"]) \
750-
and key in ["_from", "from_expression"]:
689+
if config["save_path_as_parameter"] and key in ["_from",
690+
"from_expression"]:
751691
self.handle_path_list()
752692

753693
def handle_path_list(self):
@@ -805,27 +745,20 @@ def convert_to_argo(self):
805745
s3 = None
806746
for save in self.save:
807747
if isinstance(save, S3Artifact):
808-
s3 = deepcopy(save)
809-
if s3._sub_path is not None:
810-
if s3.key[-1] != "/":
811-
s3.key += "/"
812-
s3.key += s3._sub_path
748+
s3 = save
813749

814750
if self.path is not None:
815751
return V1alpha1Artifact(name=self.name, path=self.path,
816752
archive=archive, s3=s3,
817-
global_name=self.global_name,
818-
optional=self.optional)
753+
global_name=self.global_name)
819754
elif self._from is not None:
820755
return V1alpha1Artifact(name=self.name, _from=str(self._from),
821756
archive=archive, s3=s3,
822-
global_name=self.global_name,
823-
optional=self.optional)
757+
global_name=self.global_name)
824758
elif self.from_expression is not None:
825759
return V1alpha1Artifact(
826760
name=self.name, from_expression=str(self.from_expression),
827-
archive=archive, s3=s3, global_name=self.global_name,
828-
optional=self.optional)
761+
archive=archive, s3=s3, global_name=self.global_name)
829762
else:
830763
raise RuntimeError("Output artifact %s is not specified" % self)
831764

src/dflow/python/python_op_template.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,6 @@ def __init__(self,
193193
self.inputs.artifacts[name] = InputArtifact(
194194
path="/tmp/inputs/artifacts/" + name,
195195
optional=sign.optional, type=sign.type)
196-
if config["save_path_as_artifact"]:
197-
self.inputs.parameters["dflow_%s_path_list" % name].path =\
198-
"/tmp/inputs/parameters/dflow_%s_path_list" % name
199196
elif isinstance(sign, BigParameter):
200197
self.inputs.parameters[name] = InputParameter(
201198
save_as_artifact=True, path="/tmp/inputs/parameters/"
@@ -264,17 +261,14 @@ def __init__(self,
264261
self.python_packages = set(python_packages)
265262
self.inputs.artifacts["dflow_python_packages"] = InputArtifact(
266263
path="/tmp/inputs/artifacts/dflow_python_packages")
267-
if config["save_path_as_artifact"]:
268-
name = "dflow_python_packages"
269-
self.inputs.parameters["dflow_%s_path_list" % name].path = \
270-
"/tmp/inputs/parameters/dflow_%s_path_list" % name
271264
script += "import os, sys, json\n"
272265
script += "package_root = '/tmp/inputs/artifacts/"\
273266
"dflow_python_packages'\n"
274-
script += "for f in os.listdir(package_root):\n"
275-
script += " if f[:%s] == '%s':\n" % (
276-
len(config["catalog_file_name"]), config["catalog_file_name"])
277-
script += " with open(os.path.join(package_root, f), 'r')"\
267+
script += "catalog_dir = os.path.join(package_root, "\
268+
"'%s')\n" % config['catalog_dir_name']
269+
script += "if os.path.exists(catalog_dir):\n"
270+
script += " for f in os.listdir(catalog_dir):\n"
271+
script += " with open(os.path.join(catalog_dir, f), 'r')"\
278272
" as fd:\n"
279273
script += " for item in json.load(fd)['path_list']:\n"
280274
script += " sys.path.insert(0, os.path.join("\
@@ -283,10 +277,8 @@ def __init__(self,
283277
script += "from dflow import config\n"
284278
script += "config['save_path_as_parameter'] = %s\n" \
285279
% config["save_path_as_parameter"]
286-
script += "config['save_path_as_artifact'] = %s\n" \
287-
% config["save_path_as_artifact"]
288-
script += "config['catalog_file_name'] = '%s'\n" \
289-
% config["catalog_file_name"]
280+
script += "config['catalog_dir_name'] = '%s'\n" \
281+
% config["catalog_dir_name"]
290282
if op_class.__module__ == "__main__":
291283
source_lines, start_line = inspect.getsourcelines(op_class)
292284
with open(inspect.getsourcefile(op_class), "r") as fd:
@@ -345,8 +337,7 @@ def __init__(self,
345337
script += "output_sign = %s.get_output_sign()\n" % class_name
346338
for name, sign in output_sign.items():
347339
if isinstance(sign, Artifact):
348-
if config["save_path_as_parameter"] or \
349-
config["save_path_as_artifact"]:
340+
if config["save_path_as_parameter"]:
350341
self.outputs.parameters["dflow_%s_path_list" %
351342
name].value_from_path = \
352343
"/tmp/outputs/parameters/dflow_%s_path_list" % name

0 commit comments

Comments
 (0)