|
5 | 5 | import tarfile
|
6 | 6 | from collections import namedtuple
|
7 | 7 | from datetime import datetime
|
| 8 | +from tempfile import TemporaryDirectory |
8 | 9 | from io import BytesIO
|
9 | 10 | from itertools import chain
|
10 | 11 | from typing import (
|
@@ -878,6 +879,73 @@ def tarball(self) -> tarfile.TarFile:
|
878 | 879 | """
|
879 | 880 | return self._tar
|
880 | 881 |
|
| 882 | + def extract(self) -> TemporaryDirectory: |
| 883 | + """ |
| 884 | + Extracts the code package to a temporary directory. |
| 885 | +
|
| 886 | + This creates a temporary directory containing all user code |
| 887 | + files from the code package. The temporary directory is |
| 888 | + automatically deleted when the returned TemporaryDirectory |
| 889 | + object is garbage collected or when its cleanup() is called. |
| 890 | +
|
| 891 | + To preserve the contents to a permanent location, use |
| 892 | + os.replace() which performs a zero-copy move on the same |
| 893 | + filesystem: |
| 894 | +
|
| 895 | + ```python |
| 896 | + with task.code.extract() as tmp_dir: |
| 897 | + # Move contents to permanent location |
| 898 | + for item in os.listdir(tmp_dir): |
| 899 | + src = os.path.join(tmp_dir, item) |
| 900 | + dst = os.path.join('/path/to/permanent/dir', item) |
| 901 | + os.makedirs(os.path.dirname(dst), exist_ok=True) |
| 902 | + os.replace(src, dst) # Atomic move operation |
| 903 | + ``` |
| 904 | + Returns |
| 905 | + ------- |
| 906 | + TemporaryDirectory |
| 907 | + A temporary directory containing the extracted code files. |
| 908 | + The directory and its contents are automatically deleted when |
| 909 | + this object is garbage collected. |
| 910 | + """ |
| 911 | + exclusions = [ |
| 912 | + "metaflow/", |
| 913 | + "metaflow_extensions/", |
| 914 | + "INFO", |
| 915 | + "CONFIG_PARAMETERS", |
| 916 | + "conda.manifest", |
| 917 | + # This file is created when using the conda/pypi features available in |
| 918 | + # nflx-metaflow-extensions: https://github.com/Netflix/metaflow-nflx-extensions |
| 919 | + "condav2-1.cnd", |
| 920 | + ] |
| 921 | + members = [ |
| 922 | + m |
| 923 | + for m in self.tarball.getmembers() |
| 924 | + if not any( |
| 925 | + (x.endswith("/") and m.name.startswith(x)) or (m.name == x) |
| 926 | + for x in exclusions |
| 927 | + ) |
| 928 | + ] |
| 929 | + |
| 930 | + tmp = TemporaryDirectory() |
| 931 | + self.tarball.extractall(tmp.name, members) |
| 932 | + return tmp |
| 933 | + |
| 934 | + @property |
| 935 | + def script_name(self) -> str: |
| 936 | + """ |
| 937 | + Returns the filename of the Python script containing the FlowSpec. |
| 938 | +
|
| 939 | + This is the main Python file that was used to execute the flow. For example, |
| 940 | + if your flow is defined in 'myflow.py', this property will return 'myflow.py'. |
| 941 | +
|
| 942 | + Returns |
| 943 | + ------- |
| 944 | + str |
| 945 | + Name of the Python file containing the FlowSpec |
| 946 | + """ |
| 947 | + return self._info["script"] |
| 948 | + |
881 | 949 | def __str__(self):
|
882 | 950 | return "<MetaflowCode: %s>" % self._info["script"]
|
883 | 951 |
|
|
0 commit comments