diff --git a/python/migrate.py b/python/migrate.py index 3e112dac4..d3a6e9731 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -5,6 +5,7 @@ import oracledb import pyodbc import psycopg2 +import ibm_db import threading from typing import Any, Dict @@ -342,6 +343,94 @@ def cleanup_migrate_postgresql(): mgp.add_batch_read_proc(postgresql, init_migrate_postgresql, cleanup_migrate_postgresql) +# IBM DB2 dictionary to store connections and cursors by thread +db2_dict = {} + + +def init_migrate_db2( + table_or_sql: str, + config: mgp.Map, + config_path: str = "", + params: mgp.Nullable[mgp.Any] = None, +): + global db2_dict + + if params: + _check_params_type(params) + + if len(config_path) > 0: + config = _combine_config(config=config, config_path=config_path) + + if _query_is_table(table_or_sql): + table_or_sql = f"SELECT * FROM {table_or_sql}" + + if threading.get_native_id not in db2_dict: + db2_dict[threading.get_native_id] = {} + + if Constants.CURSOR not in db2_dict[threading.get_native_id]: + db2_dict[threading.get_native_id][Constants.CURSOR] = None + + if db2_dict[threading.get_native_id][Constants.CURSOR] is None: + connection_string = _create_connection_string(config) + connection = ibm_db.connect(connection_string, "", "") + stmt = ibm_db.prepare(connection, table_or_sql) + + if params: + for i, param in enumerate(params): + ibm_db.bind_param(stmt, i + 1, param) + + ibm_db.execute(stmt) + + db2_dict[threading.get_native_id][Constants.CONNECTION] = connection + db2_dict[threading.get_native_id][Constants.CURSOR] = stmt + db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] = [ + column[Constants.I_COLUMN_NAME] for column in ibm_db.fetch_assoc(stmt) + ] + + +def db2( + table_or_sql: str, + config: mgp.Map, + config_path: str = "", + params: mgp.Nullable[mgp.Any] = None, +) -> mgp.Record(row=mgp.Map): + """ + With migrate.db2 you can access IBM DB2 and execute queries. The result table is converted into a stream, + and returned rows can be used to create or create graph structures. Config must be at least empty map. + If config_path is passed, every key,value pair from JSON file will overwrite any values in config file. + + :param table_or_sql: Table name or an SQL query + :param config: Connection configuration parameters (as in ibm_db.connect), + :param config_path: Path to the JSON file containing configuration parameters (as in ibm_db.connect) + :param params: Optionally, queries may be parameterized. In that case, `params` provides parameter values + :return: The result table as a stream of rows + """ + global db2_dict + cursor = db2_dict[threading.get_native_id][Constants.CURSOR] + column_names = db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] + + rows = [] + for _ in range(Constants.BATCH_SIZE): + row = ibm_db.fetch_assoc(cursor) + if not row: + break + rows.append(row) + + return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows] + + +def cleanup_migrate_db2(): + global db2_dict + db2_dict[threading.get_native_id][Constants.CURSOR] = None + ibm_db.commit(db2_dict[threading.get_native_id][Constants.CONNECTION]) + ibm_db.close(db2_dict[threading.get_native_id][Constants.CONNECTION]) + db2_dict[threading.get_native_id][Constants.CONNECTION] = None + db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None + + +mgp.add_batch_read_proc(db2, init_migrate_db2, cleanup_migrate_db2) + + def _query_is_table(table_or_sql: str) -> bool: return len(table_or_sql.split()) == 1 @@ -375,3 +464,7 @@ def _check_params_type(params: Any, types=(dict, list, tuple)) -> None: raise TypeError( "Database query parameter values must be passed in a container of type List[Any] (or Map, if migrating from MySQL or Oracle DB)" ) + + +def _create_connection_string(config: mgp.Map) -> str: + return ";".join(f"{key}={value}" for key, value in config.items()) diff --git a/python/requirements.txt b/python/requirements.txt index c64bae23c..54ea70877 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -13,5 +13,6 @@ mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 psycopg2-binary==2.9.9 +ibm-db==3.2.3 defusedxml==0.7.1 scipy==1.12.0 diff --git a/python/requirements_no_ml.txt b/python/requirements_no_ml.txt index 67a1686f6..38dd0be39 100644 --- a/python/requirements_no_ml.txt +++ b/python/requirements_no_ml.txt @@ -11,5 +11,6 @@ mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 psycopg2-binary==2.9.9 +ibm-db==3.2.3 defusedxml==0.7.1 scipy==1.12.0