From 17dfab8e49ad5e6a7e900a7ceced7918076734c2 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Wed, 18 Oct 2023 21:32:54 -0400 Subject: [PATCH 1/9] Fix error when `/tmp` is a symlink On macOS, `/tmp` is a symlink to `/private/tmp`. When creating the tarball, the symlink will be resolved, making the file paths on server are different from those on client. This results in a `FileNotFoundError`. This patch fixes the issue by resolving the local path to the real one if it is a symlink. --- greenplumpython/experimental/file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index ac8d49bd..ed576b8c 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -121,7 +121,7 @@ def _install_on_server(pkg_dir: str, requirements: str) -> str: def _install_packages(db: gp.Database, requirements: str): tmp_archive_name = f"tar_{uuid.uuid4().hex}" # FIXME: Windows client is not supported yet. - local_dir = pathlib.Path("/") / "tmp" / tmp_archive_name / "pip" + local_dir = (pathlib.Path("/") / "tmp").resolve() / tmp_archive_name / "pip" local_dir.mkdir(parents=True) cmd = [ sys.executable, From 3d39bae2069a372ba474123116f86868ea88324b Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Thu, 19 Oct 2023 04:03:09 -0400 Subject: [PATCH 2/9] Switch to using tempfile --- greenplumpython/experimental/file.py | 62 ++++++++++++++-------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index ed576b8c..9e347089 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -3,6 +3,7 @@ import io import pathlib import tarfile +import tempfile import uuid from typing import get_type_hints @@ -120,37 +121,36 @@ def _install_on_server(pkg_dir: str, requirements: str) -> str: def _install_packages(db: gp.Database, requirements: str): tmp_archive_name = f"tar_{uuid.uuid4().hex}" - # FIXME: Windows client is not supported yet. - local_dir = (pathlib.Path("/") / "tmp").resolve() / tmp_archive_name / "pip" - local_dir.mkdir(parents=True) - cmd = [ - sys.executable, - "-m", - "pip", - "download", - "--requirement", - "/dev/stdin", - "--dest", - local_dir, - ] - try: - sp.check_output(cmd, text=True, stderr=sp.STDOUT, input=requirements) - except sp.CalledProcessError as e: - raise e from Exception(e.stdout) - _archive_and_upload(tmp_archive_name, [local_dir.resolve()], db) - extracted = db.apply(lambda: _extract_files(tmp_archive_name, "root"), column_name="cache_dir") - assert len(list(extracted)) == 1 - server_dir = ( - pathlib.Path("/") - / "tmp" - / tmp_archive_name - / "extracted" - / local_dir.relative_to(local_dir.root) - ) - installed = extracted.apply( - lambda _: _install_on_server(server_dir.as_uri(), requirements), column_name="result" - ) - assert len(list(installed)) == 1 + with tempfile.TemporaryDirectory() as local_dir: + local_dir_path = pathlib.Path(local_dir.name) # type: ignore reportUnknownArgumentType + cmd = [ + sys.executable, + "-m", + "pip", + "download", + "--requirement", + "/dev/stdin", + "--dest", + str(local_dir_path), + ] + try: + sp.check_output(cmd, text=True, stderr=sp.STDOUT, input=requirements) + except sp.CalledProcessError as e: + raise e from Exception(e.stdout) + _archive_and_upload(tmp_archive_name, [local_dir], db) + extracted = db.apply(lambda: _extract_files(tmp_archive_name, "root"), column_name="cache_dir") + assert len(list(extracted)) == 1 + server_dir = ( + pathlib.Path("/") + / "tmp" + / tmp_archive_name + / "extracted" + / local_dir_path.relative_to(local_dir_path.root) + ) + installed = extracted.apply( + lambda _: _install_on_server(server_dir.as_uri(), requirements), column_name="result" + ) + assert len(list(installed)) == 1 setattr(gp.Database, "install_packages", _install_packages) From 3577d902d8b23c9e2c80c8d25a3c2bc7258330cc Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Thu, 19 Oct 2023 04:10:33 -0400 Subject: [PATCH 3/9] Fix errors --- greenplumpython/experimental/file.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index 9e347089..08136bf5 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -122,7 +122,7 @@ def _install_on_server(pkg_dir: str, requirements: str) -> str: def _install_packages(db: gp.Database, requirements: str): tmp_archive_name = f"tar_{uuid.uuid4().hex}" with tempfile.TemporaryDirectory() as local_dir: - local_dir_path = pathlib.Path(local_dir.name) # type: ignore reportUnknownArgumentType + local_dir_path = pathlib.Path(local_dir) cmd = [ sys.executable, "-m", @@ -138,7 +138,9 @@ def _install_packages(db: gp.Database, requirements: str): except sp.CalledProcessError as e: raise e from Exception(e.stdout) _archive_and_upload(tmp_archive_name, [local_dir], db) - extracted = db.apply(lambda: _extract_files(tmp_archive_name, "root"), column_name="cache_dir") + extracted = db.apply( + lambda: _extract_files(tmp_archive_name, "root"), column_name="cache_dir" + ) assert len(list(extracted)) == 1 server_dir = ( pathlib.Path("/") From 848668f845cba764b90278c4f4cb58d480385052 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Fri, 20 Oct 2023 02:51:49 -0400 Subject: [PATCH 4/9] Switch to using tempfile on server --- greenplumpython/experimental/file.py | 156 +++++++++++++++------------ greenplumpython/func.py | 1 + 2 files changed, 87 insertions(+), 70 deletions(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index 08136bf5..06424236 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -2,10 +2,11 @@ import inspect import io import pathlib +import sys import tarfile import tempfile import uuid -from typing import get_type_hints +from typing import Any, get_type_hints import psycopg2 @@ -15,21 +16,40 @@ _CHUNK_SIZE = 256 * 1024 * 1024 # Must be much < 1 GB +def gd() -> dict[str, Any]: + try: + return globals["GD"] # type: ignore reportUnknownVariableType + except KeyError: + return sys.modules["plpy"]._GD + + @gp.create_function -def _dump_file_chunk(tmp_archive_name: str, chunk_base64: str) -> int: - tmp_archive_base = pathlib.Path("/") / "tmp" / tmp_archive_name - tmp_archive_base.mkdir(parents=True, exist_ok=True) - tmp_archive_path = tmp_archive_base / f"{tmp_archive_name}.tar.gz" +def _dump_file_chunk(tmp_archive_name: str, chunk_base64: str) -> str: + server_tmp_dir_handle = f"__pygp_{uuid.uuid4().hex}" + if server_tmp_dir_handle not in gd(): + server_tmp_dir = tempfile.TemporaryDirectory() + gd()[server_tmp_dir_handle] = server_tmp_dir # Pin to GD for later UDFs + else: + server_tmp_dir = gd()[server_tmp_dir_handle] + + server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir.name) + server_tmp_dir_path.mkdir(parents=True, exist_ok=True) + tmp_archive_path = server_tmp_dir_path / f"{tmp_archive_name}.tar.gz" with open(tmp_archive_path, "ab") as tmp_archive: tmp_archive.write(base64.b64decode(chunk_base64)) - return 0 + return server_tmp_dir_handle @gp.create_function -def _extract_files(tmp_archive_name: str, returning: str) -> list[str]: - tmp_archive_base = pathlib.Path("/") / "tmp" / tmp_archive_name - tmp_archive_path = tmp_archive_base / f"{tmp_archive_name}.tar.gz" - extracted_root = tmp_archive_base / "extracted" +def _remove_tmp_dir(server_tmp_dir_handle: str) -> None: + gd()[server_tmp_dir_handle].cleanup() + + +@gp.create_function +def _extract_files(server_tmp_dir_handle: str, tmp_archive_name: str, returning: str) -> list[str]: + server_tmp_dir_path: pathlib.Path = pathlib.Path(gd()[server_tmp_dir_handle].name) + tmp_archive_path = server_tmp_dir_path / f"{tmp_archive_name}.tar.gz" + extracted_root = server_tmp_dir_path / "extracted" if not extracted_root.exists(): with tarfile.open(tmp_archive_path, "r:gz") as tmp_archive: extracted_root.mkdir() @@ -44,63 +64,65 @@ def _extract_files(tmp_archive_name: str, returning: str) -> list[str]: yield str(path.resolve()) -def _archive_and_upload(tmp_archive_name: str, files: list[str], db: gp.Database): - tmp_archive_base = pathlib.Path("/") / "tmp" / tmp_archive_name - tmp_archive_base.mkdir(exist_ok=True) - tmp_archive_path = tmp_archive_base / f"{tmp_archive_name}.tar.gz" - with tarfile.open(tmp_archive_path, "w:gz") as tmp_archive: - for file_path in files: - tmp_archive.add(pathlib.Path(file_path)) - server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None - with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType - with util_conn.cursor() as cursor: # type: ignore reportUnknownVariableType - cursor.execute(f"CREATE TEMP TABLE {tmp_archive_name} (id serial, text_base64 text);") - with open(tmp_archive_path, "rb") as tmp_archive: - while True: - chunk = tmp_archive.read(_CHUNK_SIZE) - if len(chunk) == 0: - break - chunk_base64 = base64.b64encode(chunk) - cursor.copy_expert( - f"COPY {tmp_archive_name} (text_base64) FROM STDIN", - io.BytesIO(chunk_base64), - ) - util_conn.commit() - cursor.execute(_dump_file_chunk._serialize(db)) # type: ignore reportUnknownArgumentType - cursor.execute( - f""" - SELECT {_dump_file_chunk._qualified_name_str}('{tmp_archive_name}', text_base64) - FROM "{tmp_archive_name}" - ORDER BY id; - """ - ) +def _archive_and_upload(tmp_archive_name: str, files: list[str], db: gp.Database) -> str: + with tempfile.TemporaryDirectory() as local_tmp_dir: + local_tmp_dir_path: pathlib.Path = pathlib.Path(local_tmp_dir) + tmp_archive_path = local_tmp_dir_path / f"{tmp_archive_name}.tar.gz" + with tarfile.open(tmp_archive_path, "w:gz") as tmp_archive: + for file_path in files: + tmp_archive.add(pathlib.Path(file_path)) + server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None + with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType + with util_conn.cursor() as cursor: # type: ignore reportUnknownVariableType + cursor.execute( + f"CREATE TEMP TABLE {tmp_archive_name} (id serial, text_base64 text);" + ) + with open(tmp_archive_path, "rb") as tmp_archive: + while True: + chunk = tmp_archive.read(_CHUNK_SIZE) + if len(chunk) == 0: + break + chunk_base64 = base64.b64encode(chunk) + cursor.copy_expert( + f"COPY {tmp_archive_name} (text_base64) FROM STDIN", + io.BytesIO(chunk_base64), + ) + util_conn.commit() + cursor.execute(_dump_file_chunk._serialize(db)) # type: ignore reportUnknownArgumentType + cursor.execute( + f""" + SELECT {_dump_file_chunk._qualified_name_str}('{tmp_archive_name}', text_base64) + FROM "{tmp_archive_name}" + ORDER BY id; + """ + ) + return cursor.fetchall()[0][0] @classmethod def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> gp.DataFrame: tmp_archive_name = f"tar_{uuid.uuid4().hex}" - _archive_and_upload(tmp_archive_name, files, db) + server_tmp_dir_handle = _archive_and_upload(tmp_archive_name, files, db) func_sig = inspect.signature(parser.unwrap()) result_members = get_type_hints(func_sig.return_annotation) - return db.apply( - lambda: parser(_extract_files(tmp_archive_name, "files")), + df = db.apply( + lambda: parser(_extract_files(server_tmp_dir_handle, tmp_archive_name, "files")), expand=len(result_members) == 0, ) + db.apply(lambda: _remove_tmp_dir(server_tmp_dir_handle)) + return df setattr(gp.DataFrame, "from_files", _from_files) - -import subprocess as sp -import sys +import subprocess @gp.create_function -def _install_on_server(pkg_dir: str, requirements: str) -> str: - import subprocess as sp - import sys - +def _install_on_server(server_tmp_dir_handle: str, local_tmp_dir: str, requirements: str) -> str: assert sys.executable, "Python executable is required to install packages." + server_tmp_dir_path: pathlib.Path = pathlib.Path(gd()[server_tmp_dir_handle].name) + local_tmp_dir_path = pathlib.Path(local_tmp_dir) cmd = [ sys.executable, "-m", @@ -110,19 +132,21 @@ def _install_on_server(pkg_dir: str, requirements: str) -> str: "--requirement", "/dev/stdin", "--find-links", - pkg_dir, + str(server_tmp_dir_path / local_tmp_dir_path.relative_to(local_tmp_dir_path.root)), ] try: - output = sp.check_output(cmd, text=True, stderr=sp.STDOUT, input=requirements) + output = subprocess.check_output( + cmd, text=True, stderr=subprocess.STDOUT, input=requirements + ) return output - except sp.CalledProcessError as e: + except subprocess.CalledProcessError as e: raise Exception(e.stdout) def _install_packages(db: gp.Database, requirements: str): tmp_archive_name = f"tar_{uuid.uuid4().hex}" - with tempfile.TemporaryDirectory() as local_dir: - local_dir_path = pathlib.Path(local_dir) + with tempfile.TemporaryDirectory() as local_tmp_dir: + local_tmp_dir_path = pathlib.Path(local_tmp_dir) cmd = [ sys.executable, "-m", @@ -131,28 +155,20 @@ def _install_packages(db: gp.Database, requirements: str): "--requirement", "/dev/stdin", "--dest", - str(local_dir_path), + str(local_tmp_dir_path), ] try: - sp.check_output(cmd, text=True, stderr=sp.STDOUT, input=requirements) - except sp.CalledProcessError as e: + subprocess.check_output(cmd, text=True, stderr=subprocess.STDOUT, input=requirements) + except subprocess.CalledProcessError as e: raise e from Exception(e.stdout) - _archive_and_upload(tmp_archive_name, [local_dir], db) - extracted = db.apply( - lambda: _extract_files(tmp_archive_name, "root"), column_name="cache_dir" - ) + server_tmp_dir_handle = _archive_and_upload(tmp_archive_name, [local_tmp_dir], db) + extracted = db.apply(lambda: _extract_files(tmp_archive_name, "root")) assert len(list(extracted)) == 1 - server_dir = ( - pathlib.Path("/") - / "tmp" - / tmp_archive_name - / "extracted" - / local_dir_path.relative_to(local_dir_path.root) - ) installed = extracted.apply( - lambda _: _install_on_server(server_dir.as_uri(), requirements), column_name="result" + lambda _: _install_on_server(server_tmp_dir_handle, local_tmp_dir.name, requirements) ) assert len(list(installed)) == 1 + db.apply(lambda: _remove_tmp_dir(server_tmp_dir_handle)) setattr(gp.Database, "install_packages", _install_packages) diff --git a/greenplumpython/func.py b/greenplumpython/func.py index f4676f5e..b68198c2 100644 --- a/greenplumpython/func.py +++ b/greenplumpython/func.py @@ -345,6 +345,7 @@ def _serialize(self, db: Database) -> str: f" if {sysconfig_lib_name}.get_python_version() != '{python_version}':\n" f" raise ModuleNotFoundError\n" f" setattr({sys_lib_name}.modules['plpy'], '_SD', SD)\n" + f" setattr({sys_lib_name}.modules['plpy'], '_GD', GD)\n" f" GD['{func_ast.name}'] = {pickle_lib_name}.loads({func_pickled})\n" f" except ModuleNotFoundError:\n" f" exec({json.dumps(ast.unparse(func_ast))}, globals())\n" From 6680dad5563c7654101572de41194b78245018b5 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Mon, 23 Oct 2023 04:57:47 -0400 Subject: [PATCH 5/9] Don't close util conn before done --- greenplumpython/experimental/file.py | 131 +++++++++++++-------------- 1 file changed, 63 insertions(+), 68 deletions(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index 06424236..7eaca779 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -9,6 +9,7 @@ from typing import Any, get_type_hints import psycopg2 +import psycopg2.extensions import greenplumpython as gp from greenplumpython.func import NormalFunction @@ -16,38 +17,30 @@ _CHUNK_SIZE = 256 * 1024 * 1024 # Must be much < 1 GB -def gd() -> dict[str, Any]: - try: - return globals["GD"] # type: ignore reportUnknownVariableType - except KeyError: - return sys.modules["plpy"]._GD - - @gp.create_function def _dump_file_chunk(tmp_archive_name: str, chunk_base64: str) -> str: - server_tmp_dir_handle = f"__pygp_{uuid.uuid4().hex}" - if server_tmp_dir_handle not in gd(): + try: + _gd = globals()["GD"] # type: ignore reportUnknownVariableType + except KeyError: + _gd = sys.modules["plpy"]._GD + server_tmp_dir_handle = f"__pygp_tmp_{uuid.uuid4().hex}" + if server_tmp_dir_handle not in _gd: server_tmp_dir = tempfile.TemporaryDirectory() - gd()[server_tmp_dir_handle] = server_tmp_dir # Pin to GD for later UDFs + _gd[server_tmp_dir_handle] = server_tmp_dir # Pin to GD for later UDFs else: - server_tmp_dir = gd()[server_tmp_dir_handle] + server_tmp_dir = _gd[server_tmp_dir_handle] # type: ignore reportUnknownVariableType - server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir.name) + server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir.name) # type: ignore reportUnknownVariableType server_tmp_dir_path.mkdir(parents=True, exist_ok=True) tmp_archive_path = server_tmp_dir_path / f"{tmp_archive_name}.tar.gz" with open(tmp_archive_path, "ab") as tmp_archive: tmp_archive.write(base64.b64decode(chunk_base64)) - return server_tmp_dir_handle - - -@gp.create_function -def _remove_tmp_dir(server_tmp_dir_handle: str) -> None: - gd()[server_tmp_dir_handle].cleanup() + return server_tmp_dir.name @gp.create_function -def _extract_files(server_tmp_dir_handle: str, tmp_archive_name: str, returning: str) -> list[str]: - server_tmp_dir_path: pathlib.Path = pathlib.Path(gd()[server_tmp_dir_handle].name) +def _extract_files(server_tmp_dir: str, tmp_archive_name: str, returning: str) -> list[str]: + server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir) # type: ignore reportUnknownVariableType tmp_archive_path = server_tmp_dir_path / f"{tmp_archive_name}.tar.gz" extracted_root = server_tmp_dir_path / "extracted" if not extracted_root.exists(): @@ -64,53 +57,54 @@ def _extract_files(server_tmp_dir_handle: str, tmp_archive_name: str, returning: yield str(path.resolve()) -def _archive_and_upload(tmp_archive_name: str, files: list[str], db: gp.Database) -> str: +def _archive_and_upload( + util_conn: psycopg2.extensions.connection, + tmp_archive_name: str, + files: list[str], + db: gp.Database, +) -> str: with tempfile.TemporaryDirectory() as local_tmp_dir: local_tmp_dir_path: pathlib.Path = pathlib.Path(local_tmp_dir) tmp_archive_path = local_tmp_dir_path / f"{tmp_archive_name}.tar.gz" with tarfile.open(tmp_archive_path, "w:gz") as tmp_archive: for file_path in files: tmp_archive.add(pathlib.Path(file_path)) - server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None - with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType - with util_conn.cursor() as cursor: # type: ignore reportUnknownVariableType - cursor.execute( - f"CREATE TEMP TABLE {tmp_archive_name} (id serial, text_base64 text);" - ) - with open(tmp_archive_path, "rb") as tmp_archive: - while True: - chunk = tmp_archive.read(_CHUNK_SIZE) - if len(chunk) == 0: - break - chunk_base64 = base64.b64encode(chunk) - cursor.copy_expert( - f"COPY {tmp_archive_name} (text_base64) FROM STDIN", - io.BytesIO(chunk_base64), - ) - util_conn.commit() - cursor.execute(_dump_file_chunk._serialize(db)) # type: ignore reportUnknownArgumentType - cursor.execute( - f""" - SELECT {_dump_file_chunk._qualified_name_str}('{tmp_archive_name}', text_base64) - FROM "{tmp_archive_name}" - ORDER BY id; - """ - ) - return cursor.fetchall()[0][0] + with util_conn.cursor() as cursor: # type: ignore reportUnknownVariableType + cursor.execute(f"CREATE TEMP TABLE {tmp_archive_name} (id serial, text_base64 text);") + with open(tmp_archive_path, "rb") as tmp_archive: + while True: + chunk = tmp_archive.read(_CHUNK_SIZE) + if len(chunk) == 0: + break + chunk_base64 = base64.b64encode(chunk) + cursor.copy_expert( + f"COPY {tmp_archive_name} (text_base64) FROM STDIN", + io.BytesIO(chunk_base64), + ) + util_conn.commit() + cursor.execute(_dump_file_chunk._serialize(db)) # type: ignore reportUnknownArgumentType + cursor.execute( + f""" + SELECT {_dump_file_chunk._qualified_name_str}('{tmp_archive_name}', text_base64) + FROM "{tmp_archive_name}" + ORDER BY id; + """ + ) + return cursor.fetchall()[0][0] @classmethod def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> gp.DataFrame: tmp_archive_name = f"tar_{uuid.uuid4().hex}" - server_tmp_dir_handle = _archive_and_upload(tmp_archive_name, files, db) - func_sig = inspect.signature(parser.unwrap()) - result_members = get_type_hints(func_sig.return_annotation) - df = db.apply( - lambda: parser(_extract_files(server_tmp_dir_handle, tmp_archive_name, "files")), - expand=len(result_members) == 0, - ) - db.apply(lambda: _remove_tmp_dir(server_tmp_dir_handle)) - return df + server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None + with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType + server_tmp_dir = _archive_and_upload(util_conn, tmp_archive_name, files, db) + func_sig = inspect.signature(parser.unwrap()) + result_members = get_type_hints(func_sig.return_annotation) + return db.apply( + lambda: parser(_extract_files(server_tmp_dir, tmp_archive_name, "files")), + expand=len(result_members) == 0, + ) setattr(gp.DataFrame, "from_files", _from_files) @@ -119,9 +113,9 @@ def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> @gp.create_function -def _install_on_server(server_tmp_dir_handle: str, local_tmp_dir: str, requirements: str) -> str: +def _install_on_server(server_tmp_dir: str, local_tmp_dir: str, requirements: str) -> str: assert sys.executable, "Python executable is required to install packages." - server_tmp_dir_path: pathlib.Path = pathlib.Path(gd()[server_tmp_dir_handle].name) + server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir) # type: ignore reportUnknownVariableType local_tmp_dir_path = pathlib.Path(local_tmp_dir) cmd = [ sys.executable, @@ -145,8 +139,8 @@ def _install_on_server(server_tmp_dir_handle: str, local_tmp_dir: str, requireme def _install_packages(db: gp.Database, requirements: str): tmp_archive_name = f"tar_{uuid.uuid4().hex}" - with tempfile.TemporaryDirectory() as local_tmp_dir: - local_tmp_dir_path = pathlib.Path(local_tmp_dir) + with tempfile.TemporaryDirectory() as local_pkg_dir: + local_tmp_dir_path = pathlib.Path(local_pkg_dir) cmd = [ sys.executable, "-m", @@ -161,14 +155,15 @@ def _install_packages(db: gp.Database, requirements: str): subprocess.check_output(cmd, text=True, stderr=subprocess.STDOUT, input=requirements) except subprocess.CalledProcessError as e: raise e from Exception(e.stdout) - server_tmp_dir_handle = _archive_and_upload(tmp_archive_name, [local_tmp_dir], db) - extracted = db.apply(lambda: _extract_files(tmp_archive_name, "root")) - assert len(list(extracted)) == 1 - installed = extracted.apply( - lambda _: _install_on_server(server_tmp_dir_handle, local_tmp_dir.name, requirements) - ) - assert len(list(installed)) == 1 - db.apply(lambda: _remove_tmp_dir(server_tmp_dir_handle)) + server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None + with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType + server_tmp_dir = _archive_and_upload(util_conn, tmp_archive_name, [local_pkg_dir], db) + extracted = db.apply(lambda: _extract_files(server_tmp_dir, tmp_archive_name, "root")) + assert len(list(extracted)) == 1 + installed = extracted.apply( + lambda _: _install_on_server(server_tmp_dir, local_pkg_dir.name, requirements) + ) + assert len(list(installed)) == 1 setattr(gp.Database, "install_packages", _install_packages) From 47d1e0eeeae788626566649ff63ff3b417f12d33 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Mon, 23 Oct 2023 05:34:58 -0400 Subject: [PATCH 6/9] Fix path error for install_packages() --- greenplumpython/experimental/file.py | 8 ++++++-- tests/test_file.py | 6 ++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index 7eaca779..806afa9d 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -126,7 +126,11 @@ def _install_on_server(server_tmp_dir: str, local_tmp_dir: str, requirements: st "--requirement", "/dev/stdin", "--find-links", - str(server_tmp_dir_path / local_tmp_dir_path.relative_to(local_tmp_dir_path.root)), + str( + server_tmp_dir_path + / "extracted" + / local_tmp_dir_path.relative_to(local_tmp_dir_path.root) + ), ] try: output = subprocess.check_output( @@ -161,7 +165,7 @@ def _install_packages(db: gp.Database, requirements: str): extracted = db.apply(lambda: _extract_files(server_tmp_dir, tmp_archive_name, "root")) assert len(list(extracted)) == 1 installed = extracted.apply( - lambda _: _install_on_server(server_tmp_dir, local_pkg_dir.name, requirements) + lambda _: _install_on_server(server_tmp_dir, local_pkg_dir, requirements) ) assert len(list(installed)) == 1 diff --git a/tests/test_file.py b/tests/test_file.py index f56ea3c7..9a946b3b 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -41,8 +41,10 @@ def test_csv_multi_chunks(db: gp.Database): default_chunk_size = greenplumpython.experimental.file._CHUNK_SIZE greenplumpython.experimental.file._CHUNK_SIZE = 3 assert greenplumpython.experimental.file._CHUNK_SIZE == 3 - test_csv_single_chunk(db) - greenplumpython.experimental.file._CHUNK_SIZE = default_chunk_size + try: + test_csv_single_chunk(db) + finally: + greenplumpython.experimental.file._CHUNK_SIZE = default_chunk_size import subprocess From 83ce6db2abb697a1a1807c82dc91d96c1eb4bb7c Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Mon, 23 Oct 2023 20:45:02 -0400 Subject: [PATCH 7/9] Dont remove tmp dir in `_from_files()` --- greenplumpython/experimental/file.py | 76 +++++++++++++++++----------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index 806afa9d..8753df83 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -18,30 +18,29 @@ @gp.create_function -def _dump_file_chunk(tmp_archive_name: str, chunk_base64: str) -> str: +def _dump_file_chunk(tmp_dir_handle: str, chunk_base64: str) -> str: try: _gd = globals()["GD"] # type: ignore reportUnknownVariableType except KeyError: _gd = sys.modules["plpy"]._GD - server_tmp_dir_handle = f"__pygp_tmp_{uuid.uuid4().hex}" - if server_tmp_dir_handle not in _gd: - server_tmp_dir = tempfile.TemporaryDirectory() - _gd[server_tmp_dir_handle] = server_tmp_dir # Pin to GD for later UDFs + if tmp_dir_handle not in _gd: + server_tmp_dir = tempfile.TemporaryDirectory(prefix="pygp.srv.") + _gd[tmp_dir_handle] = server_tmp_dir # Pin to GD for later UDFs else: - server_tmp_dir = _gd[server_tmp_dir_handle] # type: ignore reportUnknownVariableType + server_tmp_dir = _gd[tmp_dir_handle] # type: ignore reportUnknownVariableType server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir.name) # type: ignore reportUnknownVariableType server_tmp_dir_path.mkdir(parents=True, exist_ok=True) - tmp_archive_path = server_tmp_dir_path / f"{tmp_archive_name}.tar.gz" + tmp_archive_path = server_tmp_dir_path / f"{tmp_dir_handle}.tar.gz" with open(tmp_archive_path, "ab") as tmp_archive: tmp_archive.write(base64.b64decode(chunk_base64)) return server_tmp_dir.name @gp.create_function -def _extract_files(server_tmp_dir: str, tmp_archive_name: str, returning: str) -> list[str]: - server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir) # type: ignore reportUnknownVariableType - tmp_archive_path = server_tmp_dir_path / f"{tmp_archive_name}.tar.gz" +def _extract_files(server_tmp_dir: str, tmp_dir_handle: str, returning: str) -> list[str]: + server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir) + tmp_archive_path = server_tmp_dir_path / f"{tmp_dir_handle}.tar.gz" extracted_root = server_tmp_dir_path / "extracted" if not extracted_root.exists(): with tarfile.open(tmp_archive_path, "r:gz") as tmp_archive: @@ -57,20 +56,34 @@ def _extract_files(server_tmp_dir: str, tmp_archive_name: str, returning: str) - yield str(path.resolve()) +def _remove_tmp_dir(conn: psycopg2.extensions.connection, db: gp.Database, tmp_dir_handle: str): + @gp.create_function + def udf(tmp_dir_handle: str) -> None: + try: + _gd = globals()["GD"] # type: ignore reportUnknownVariableType + except KeyError: + _gd = sys.modules["plpy"]._GD + _gd[tmp_dir_handle].cleanup() + + with conn.cursor() as cursor: + cursor.execute(udf._serialize(db)) + cursor.execute(f"SELECT {udf._qualified_name_str}('{tmp_dir_handle}');") + + def _archive_and_upload( util_conn: psycopg2.extensions.connection, - tmp_archive_name: str, + tmp_dir_handle: str, files: list[str], db: gp.Database, ) -> str: - with tempfile.TemporaryDirectory() as local_tmp_dir: + with tempfile.TemporaryDirectory(prefix="pygp.cln.") as local_tmp_dir: local_tmp_dir_path: pathlib.Path = pathlib.Path(local_tmp_dir) - tmp_archive_path = local_tmp_dir_path / f"{tmp_archive_name}.tar.gz" + tmp_archive_path = local_tmp_dir_path / f"{tmp_dir_handle}.tar.gz" with tarfile.open(tmp_archive_path, "w:gz") as tmp_archive: for file_path in files: tmp_archive.add(pathlib.Path(file_path)) - with util_conn.cursor() as cursor: # type: ignore reportUnknownVariableType - cursor.execute(f"CREATE TEMP TABLE {tmp_archive_name} (id serial, text_base64 text);") + with util_conn.cursor() as cursor: + cursor.execute(f"CREATE TEMP TABLE {tmp_dir_handle} (id serial, text_base64 text);") with open(tmp_archive_path, "rb") as tmp_archive: while True: chunk = tmp_archive.read(_CHUNK_SIZE) @@ -78,15 +91,15 @@ def _archive_and_upload( break chunk_base64 = base64.b64encode(chunk) cursor.copy_expert( - f"COPY {tmp_archive_name} (text_base64) FROM STDIN", + f"COPY {tmp_dir_handle} (text_base64) FROM STDIN", io.BytesIO(chunk_base64), ) util_conn.commit() - cursor.execute(_dump_file_chunk._serialize(db)) # type: ignore reportUnknownArgumentType + cursor.execute(_dump_file_chunk._serialize(db)) cursor.execute( f""" - SELECT {_dump_file_chunk._qualified_name_str}('{tmp_archive_name}', text_base64) - FROM "{tmp_archive_name}" + SELECT {_dump_file_chunk._qualified_name_str}('{tmp_dir_handle}', text_base64) + FROM "{tmp_dir_handle}" ORDER BY id; """ ) @@ -95,16 +108,18 @@ def _archive_and_upload( @classmethod def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> gp.DataFrame: - tmp_archive_name = f"tar_{uuid.uuid4().hex}" + tmp_dir_handle = f"__pygp_tar_{uuid.uuid4().hex}" server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None - with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType - server_tmp_dir = _archive_and_upload(util_conn, tmp_archive_name, files, db) + with psycopg2.connect(db._dsn, options=server_options) as util_conn: + server_tmp_dir = _archive_and_upload(util_conn, tmp_dir_handle, files, db) func_sig = inspect.signature(parser.unwrap()) result_members = get_type_hints(func_sig.return_annotation) - return db.apply( - lambda: parser(_extract_files(server_tmp_dir, tmp_archive_name, "files")), + df = db.apply( + lambda: parser(_extract_files(server_tmp_dir, tmp_dir_handle, "files")), expand=len(result_members) == 0, ) + # _remove_tmp_dir(util_conn, db, tmp_dir_handle) # Cannot remove now since the returning DataFrame depends on it. + return df setattr(gp.DataFrame, "from_files", _from_files) @@ -115,7 +130,7 @@ def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> @gp.create_function def _install_on_server(server_tmp_dir: str, local_tmp_dir: str, requirements: str) -> str: assert sys.executable, "Python executable is required to install packages." - server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir) # type: ignore reportUnknownVariableType + server_tmp_dir_path: pathlib.Path = pathlib.Path(server_tmp_dir) local_tmp_dir_path = pathlib.Path(local_tmp_dir) cmd = [ sys.executable, @@ -142,8 +157,8 @@ def _install_on_server(server_tmp_dir: str, local_tmp_dir: str, requirements: st def _install_packages(db: gp.Database, requirements: str): - tmp_archive_name = f"tar_{uuid.uuid4().hex}" - with tempfile.TemporaryDirectory() as local_pkg_dir: + tmp_dir_handle = f"__pygp_tar_{uuid.uuid4().hex}" + with tempfile.TemporaryDirectory(prefix="pygp.cln.") as local_pkg_dir: local_tmp_dir_path = pathlib.Path(local_pkg_dir) cmd = [ sys.executable, @@ -160,14 +175,15 @@ def _install_packages(db: gp.Database, requirements: str): except subprocess.CalledProcessError as e: raise e from Exception(e.stdout) server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None - with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType - server_tmp_dir = _archive_and_upload(util_conn, tmp_archive_name, [local_pkg_dir], db) - extracted = db.apply(lambda: _extract_files(server_tmp_dir, tmp_archive_name, "root")) + with psycopg2.connect(db._dsn, options=server_options) as util_conn: + server_tmp_dir = _archive_and_upload(util_conn, tmp_dir_handle, [local_pkg_dir], db) + extracted = db.apply(lambda: _extract_files(server_tmp_dir, tmp_dir_handle, "root")) assert len(list(extracted)) == 1 installed = extracted.apply( lambda _: _install_on_server(server_tmp_dir, local_pkg_dir, requirements) ) assert len(list(installed)) == 1 + _remove_tmp_dir(util_conn, db, tmp_dir_handle) setattr(gp.Database, "install_packages", _install_packages) From 5bd7e478340f71d9952ef1ac7977044ba11bbe43 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Mon, 23 Oct 2023 20:52:32 -0400 Subject: [PATCH 8/9] Fix lint error --- greenplumpython/experimental/file.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index 8753df83..506e6231 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -110,8 +110,8 @@ def _archive_and_upload( def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> gp.DataFrame: tmp_dir_handle = f"__pygp_tar_{uuid.uuid4().hex}" server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None - with psycopg2.connect(db._dsn, options=server_options) as util_conn: - server_tmp_dir = _archive_and_upload(util_conn, tmp_dir_handle, files, db) + with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType + server_tmp_dir = _archive_and_upload(util_conn, tmp_dir_handle, files, db) # type: ignore reportUnknownArgumentType func_sig = inspect.signature(parser.unwrap()) result_members = get_type_hints(func_sig.return_annotation) df = db.apply( @@ -175,15 +175,15 @@ def _install_packages(db: gp.Database, requirements: str): except subprocess.CalledProcessError as e: raise e from Exception(e.stdout) server_options = "-c gp_session_role=utility" if db._is_variant("greenplum") else None - with psycopg2.connect(db._dsn, options=server_options) as util_conn: - server_tmp_dir = _archive_and_upload(util_conn, tmp_dir_handle, [local_pkg_dir], db) + with psycopg2.connect(db._dsn, options=server_options) as util_conn: # type: ignore reportUnknownVariableType + server_tmp_dir = _archive_and_upload(util_conn, tmp_dir_handle, [local_pkg_dir], db) # type: ignore reportUnknownArgumentType extracted = db.apply(lambda: _extract_files(server_tmp_dir, tmp_dir_handle, "root")) assert len(list(extracted)) == 1 installed = extracted.apply( lambda _: _install_on_server(server_tmp_dir, local_pkg_dir, requirements) ) assert len(list(installed)) == 1 - _remove_tmp_dir(util_conn, db, tmp_dir_handle) + _remove_tmp_dir(util_conn, db, tmp_dir_handle) # type: ignore reportUnknownArgumentType setattr(gp.Database, "install_packages", _install_packages) From 5f56ebbfa9f72ad1907516e5db0ca34cebb758a9 Mon Sep 17 00:00:00 2001 From: Xuebin Su Date: Mon, 23 Oct 2023 21:02:26 -0400 Subject: [PATCH 9/9] Rephrase --- greenplumpython/experimental/file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/greenplumpython/experimental/file.py b/greenplumpython/experimental/file.py index 506e6231..99c5a719 100644 --- a/greenplumpython/experimental/file.py +++ b/greenplumpython/experimental/file.py @@ -118,7 +118,7 @@ def _from_files(_, files: list[str], parser: NormalFunction, db: gp.Database) -> lambda: parser(_extract_files(server_tmp_dir, tmp_dir_handle, "files")), expand=len(result_members) == 0, ) - # _remove_tmp_dir(util_conn, db, tmp_dir_handle) # Cannot remove now since the returning DataFrame depends on it. + # _remove_tmp_dir(util_conn, db, tmp_dir_handle) # Cannot remove now since the returned DataFrame depends on it. return df