From 2205c2e4b356dd9351206ead9e1bdeed4e386f16 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 8 Jul 2024 12:40:57 +0200 Subject: [PATCH 1/6] working on: bigquery models --- asyncdb/drivers/bigquery.py | 572 +++++++++++++++++++++++++++++++- asyncdb/drivers/scylladb.py | 2 +- asyncdb/version.py | 2 +- examples/test_bigquery_model.py | 171 ++++++++++ 4 files changed, 738 insertions(+), 9 deletions(-) create mode 100644 examples/test_bigquery_model.py diff --git a/asyncdb/drivers/bigquery.py b/asyncdb/drivers/bigquery.py index d46fb8be..031c5a4e 100644 --- a/asyncdb/drivers/bigquery.py +++ b/asyncdb/drivers/bigquery.py @@ -14,9 +14,11 @@ from google.oauth2 import service_account from ..exceptions import DriverError from .sql import SQLDriver +from ..interfaces import ModelBackend +from ..models import Model, Field -class bigquery(SQLDriver): +class bigquery(SQLDriver, ModelBackend): _provider = "bigquery" _syntax = "sql" _test_query = "SELECT 1" @@ -66,6 +68,7 @@ async def connection(self): async def close(self): # BigQuery client does not maintain persistent connections, so nothing to close here. self._connected = False + self._connection = None disconnect = close @@ -73,29 +76,31 @@ async def execute(self, query, **kwargs): """ Execute a BigQuery query """ + result = None + error = None if not self._connection: await self.connection() try: job = self._connection.query(query, **kwargs) result = job.result() # Waits for the query to finish - return result except Exception as e: - raise DriverError( - f"BigQuery: Error executing query: {e}" - ) + error = e + return result, error async def execute_many(self, query, **kwargs): """ Execute a BigQuery query """ + result = None + error = None if not self._connection: await self.connection() try: job = self._connection.query(query, **kwargs) result = job.result() # Waits for the query to finish - return result except Exception as e: - raise DriverError(f"BigQuery: Error executing query: {e}") + error = e + return result, error async def prepare(self, sentence: str, **kwargs): pass @@ -129,6 +134,19 @@ async def create_dataset(self, dataset_id: str): self._logger.error(f"Error creating Dataset: {exc}") raise DriverError(f"Error creating Dataset: {exc}") + create_keyspace = create_dataset + + async def drop_dataset(self, dataset_id: str): + try: + dataset_ref = bq.DatasetReference(self._connection.project, dataset_id) + self._connection.delete_dataset(dataset_ref, delete_contents=True, not_found_ok=True) + return True + except Exception as exc: + self._logger.error(f"Error deleting Dataset: {exc}") + raise DriverError(f"Error deleting Dataset: {exc}") + + drop_keyspace = drop_dataset + async def create_table(self, dataset_id, table_id, schema): """ Create a new table in the specified BigQuery dataset. @@ -513,3 +531,543 @@ async def multi_query(self, queries: list): ) # Create async tasks results = await asyncio.gather(*tasks) # Execute tasks concurrently and gather results return results + + ## Model Logic: + async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613 + """ + insert a row from model. + """ + try: + schema = "" + sc = _model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + cols = [] + columns = [] + source = {} + _filter = {} + n = 1 + fields = _model.columns() + for name, field in fields.items(): + try: + val = getattr(_model, field.name) + except AttributeError: + continue + ## getting the value of column: + value = self._get_value(field, val) + column = field.name + columns.append(column) + # validating required field + try: + required = field.required() + except AttributeError: + required = False + pk = self._get_attribute(field, value, attr="primary_key") + # if pk is True and value is None: + # if "db_default" in field.metadata: + # continue + if pk is True and value is None and "db_default" in field.metadata: + continue + if required is False and value is None or value == "None": + if "db_default" in field.metadata: + continue + else: + # get default value + default = field.default + if callable(default): + value = default() + else: + continue + elif required is True and value is None or value == "None": + if "db_default" in field.metadata: + # field get a default value from database + continue + else: + raise ValueError( + f"Field {name} is required and value is null over {_model.Meta.name}" + ) + elif is_dataclass(value): + if isinstance(value, Model): + ### get value for primary key associated with. + try: + value = getattr(value, name) + except AttributeError: + value = None + elif isinstance(value, uuid.UUID): + value = str(value) # convert to string, for now + elif isinstance(value, Enum): + value = value.value + source[column] = value + cols.append(column) + n += 1 + if pk := self._get_attribute(field, value, attr="primary_key"): + _filter[column] = pk + try: + values = ", ".join([f":{a}" for a in cols]) # pylint: disable=C0209 + cols = ",".join(cols) + insert = f"INSERT INTO {table}({cols}) VALUES({values}) IF NOT EXISTS;" + self._logger.debug(f"INSERT: {insert}") + stmt = self._connection.prepare(insert) + result = self._connection.execute(stmt, source) + if result.was_applied: + # get the row inserted again: + condition = " AND ".join( + [f"{key} = :{key}" for key in _filter] + ) + _select_stmt = f"SELECT * FROM {table} WHERE {condition}" + self._logger.debug(f"SELECT: {_select_stmt}") + stmt = self._connection.prepare(_select_stmt) + result = self._connection.execute(stmt, _filter).one() + if result: + _model.reset_values() + for f, val in result.items(): + setattr(_model, f, val) + return _model + except Exception as err: + raise DriverError( + message=f"Error on Insert over table {_model.Meta.name}: {err!s}" + ) from err + + async def _delete_(self, _model: Model, _filter: dict = None, **kwargs): # pylint: disable=W0613 + """ + delete a row from model. + """ + try: + schema = "" + sc = _model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + source = [] + if not _filter: + _filter = {} + n = 1 + fields = _model.columns() + for _, field in fields.items(): + try: + val = getattr(_model, field.name) + except AttributeError: + continue + ## getting the value of column: + value = self._get_value(field, val) + column = field.name + source.append(value) + n += 1 + curval = _model.old_value(column) + if pk := self._get_attribute(field, curval, attr="primary_key"): + if column in _filter: + # already this value on delete: + continue + _filter[column] = pk + try: + condition = self._where(fields, **_filter) + if not condition: + raise DriverError(f"Avoid DELETE without WHERE conditions: {_filter}") + _delete = f"DELETE FROM {table} {condition};" + self._logger.debug(f"DELETE: {_delete}") + result = self._connection.execute(_delete) + return f"DELETE {result}: {_filter!s}" + except Exception as err: + raise DriverError(message=f"Error on Insert over table {_model.Meta.name}: {err!s}") from err + + async def _update_(self, _model: Model, **kwargs): # pylint: disable=W0613 + """ + Updating a row in a Model. + TODO: How to update when if primary key changed. + Alternatives: Saving *dirty* status and previous value on dict + """ + try: + schema = "" + sc = _model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + cols = [] + source = [] + _filter = {} + _updated = {} + _primary = [] + n = 1 + fields = _model.columns() + for name, field in fields.items(): + try: + val = getattr(_model, field.name) + except AttributeError: + continue + ## getting the value of column: + value = self._get_value(field, val) + + column = field.name + # validating required field + try: + required = field.required() + except AttributeError: + required = False + if required is False and value is None or value == "None": + default = field.default + if callable(default): + value = default() + else: + continue + elif required is True and value is None or value == "None": + if "db_default" in field.metadata: + # field get a default value from database + continue + raise ValueError(f"Field {name} is required and value is null over {_model.Meta.name}") + elif is_dataclass(value): + if isinstance(value, Model): + ### get value for primary key associated with. + try: + value = getattr(value, name) + except AttributeError: + value = None + curval = _model.old_value(name) + if pk := self._get_attribute(field, curval, attr="primary_key"): + _filter[column] = pk + _primary.append(column) + if pk := self._get_attribute(field, value, attr="primary_key"): + _updated[column] = pk + _primary.append(column) + if curval == value: + continue # no changes + cols.append(name) # pylint: disable=C0209 + source.append(value) + n += 1 + try: + if any(col in _primary for col in cols): + # in Cassandra we need to delete and insert again + condition = self._where(fields, **_filter) + _delete = f"DELETE FROM {table} {condition}" + result = self._connection.execute( + SimpleStatement(_delete) + ) + return await self._insert_(_model, **kwargs) + set_fields = ", ".join(cols) + condition = self._where(fields, **_filter) + _update = f"UPDATE {table} SET {set_fields} {condition}" + self._logger.debug(f"UPDATE: {_update}") + stmt = await self.get_sentence(_update, prepared=True) + self._connection.execute(stmt, source) + condition = self._where(fields, **_updated) + stmt = SimpleStatement(f"SELECT * FROM {table} {condition}") + result = self._connection.execute(stmt).one() + if result: + _model.reset_values() + for f, val in result.items(): + setattr(_model, f, val) + return _model + except Exception as err: + raise DriverError( + message=f"Error on Update over table {_model.Meta.name}: {err!s}" + ) from err + + async def _save_(self, _model: Model, *args, **kwargs): + """ + Save a row in a Model, using Insert-or-Update methodology. + """ + raise NotImplementedError("Method not implemented") + + async def _fetch_(self, _model: Model, *args, **kwargs): + """ + Returns one single Row using Model. + """ + try: + schema = "" + sc = _model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + fields = _model.columns() + _filter = {} + for name, field in fields.items(): + if name in kwargs: + try: + val = kwargs[name] + except AttributeError: + continue + ## getting the value of column: + datatype = field.type + value = Entity.toSQL(val, datatype) + _filter[name] = value + condition = self._where(fields, **_filter) + _get = f"SELECT * FROM {table} {condition}" + try: + smt = SimpleStatement(_get) + return self._connection.execute(smt).one() + except Exception as e: + raise DriverError(f"Error: Model Fetch over {table}: {e}") from e + + async def _filter_(self, _model: Model, *args, **kwargs): + """ + Filter a Model using Fields. + """ + try: + schema = "" + sc = _model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + fields = _model.columns(_model) + _filter = {} + if args: + columns = ",".join(args) + else: + columns = "*" + for name, field in fields.items(): + if name in kwargs: + try: + val = kwargs[name] + except AttributeError: + continue + ## getting the value of column: + datatype = field.type + value = Entity.toSQL(val, datatype) + _filter[name] = value + condition = self._where(fields, **_filter) + _get = f"SELECT {columns} FROM {table} {condition}" + try: + stmt = SimpleStatement(_get) + fut = self._connection.execute_async(stmt) + result = fut.result() + return result + except Exception as e: + raise DriverError(f"Error: Model GET over {table}: {e}") from e + + async def _select_(self, *args, **kwargs): + """ + Get a query from Model. + """ + try: + model = kwargs["_model"] + except KeyError as e: + raise DriverError(f"Missing Model for SELECT {kwargs!s}") from e + try: + schema = "" + sc = model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{model.Meta.name}" + except AttributeError: + table = model.__name__ + if args: + condition = "{}".join(args) + else: + condition = None + if "fields" in kwargs: + columns = ",".join(kwargs["fields"]) + else: + columns = "*" + _get = f"SELECT {columns} FROM {table} {condition}" + try: + smt = SimpleStatement(_get) + return self._connection.execute(smt) + except Exception as e: + raise DriverError(f"Error: Model SELECT over {table}: {e}") from e + + async def _get_(self, _model: Model, *args, **kwargs): + """ + Get one row from model. + """ + try: + schema = "" + sc = _model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + fields = _model.columns(_model) + _filter = {} + if args: + columns = ",".join(args) + else: + columns = ",".join(fields) # getting only selected fields + for name, field in fields.items(): + if name in kwargs: + try: + val = kwargs[name] + except AttributeError: + continue + ## getting the value of column: + datatype = field.type + value = Entity.toSQL(val, datatype) + _filter[name] = value + condition = self._where(fields, **_filter) + _get = f"SELECT {columns} FROM {table} {condition}" + print('SELECT ', _get) + try: + smt = SimpleStatement(_get) + return self._connection.execute(smt).one() + except Exception as e: + raise DriverError( + f"Error: Model GET over {table}: {e}" + ) from e + + async def _all_(self, _model: Model, *args, **kwargs): # pylint: disable=W0613 + """ + Get all rows on a Model. + """ + try: + schema = "" + # sc = _model.Meta.schema + if sc := _model.Meta.schema: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + if "fields" in kwargs: + columns = ",".join(kwargs["fields"]) + else: + columns = "*" + _all = f"SELECT {columns} FROM {table}" + try: + smt = SimpleStatement(_all) + return self._connection.execute(smt) + except Exception as e: + raise DriverError(f"Error: Model All over {table}: {e}") from e + + async def _remove_(self, _model: Model, **kwargs): + """ + Deleting some records using Model. + """ + try: + schema = "" + if sc := _model.Meta.schema: + schema = f"{sc}." + table = f"{schema}{_model.Meta.name}" + except AttributeError: + table = _model.__name__ + fields = _model.columns(_model) + _filter = {} + for name, field in fields.items(): + datatype = field.type + if name in kwargs: + val = kwargs[name] + value = Entity.toSQL(val, datatype) + _filter[name] = value + condition = self._where(fields, **_filter) + if not condition: + raise ValueError( + "Avoid DELETE without WHERE conditions" + ) + _delete = f"DELETE FROM {table} {condition}" + try: + self._logger.debug(f"DELETE: {_delete}") + smt = SimpleStatement(_delete) + result = self._connection.execute(smt) + return f"DELETE {result}: {_filter!s}" + except Exception as err: + raise DriverError(message=f"Error on DELETE {_model.Meta.name}: {err!s}") from err + + async def _updating_(self, *args, _filter: dict = None, **kwargs): + """ + Updating records using Model. + """ + try: + model = kwargs["_model"] + except KeyError as e: + raise DriverError(f"Missing Model for SELECT {kwargs!s}") from e + try: + schema = "" + sc = model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{model.Meta.name}" + except AttributeError: + table = model.__name__ + fields = model.columns(model) + if _filter is None and args: + _filter = args[0] + cols = [] + source = [] + new_cond = {} + n = 1 + for name, field in fields.items(): + try: + val = kwargs[name] + except (KeyError, AttributeError): + continue + ## getting the value of column: + value = self._get_value(field, val) + source.append(value) + if name in _filter: + new_cond[name] = value + cols.append("{} = {}".format(name, "?".format(n))) # pylint: disable=C0209 + n += 1 + try: + set_fields = ", ".join(cols) + condition = self._where(fields, **_filter) + _update = f"UPDATE {table} SET {set_fields} {condition}" + self._logger.debug(f"UPDATE: {_update}") + stmt = await self.get_sentence(_update, prepared=True) + result = self._connection.execute(stmt, source) + print(f"UPDATE {result}: {_filter!s}") + + new_conditions = {**_filter, **new_cond} + condition = self._where(fields, **new_conditions) + + _all = f"SELECT * FROM {table} {condition}" + stmt = await self.get_sentence(_all) + result = self._connection.execute(stmt) + return [model(**dict(r)) for r in result] + except Exception as err: + raise DriverError( + message=f"Error on UPDATE over table {model.Meta.name}: {err!s}" + ) from err + + async def _deleting_(self, *args, _filter: dict = None, **kwargs): + """ + Deleting records using Model. + """ + try: + model = kwargs["_model"] + except KeyError as e: + raise DriverError(f"Missing Model for SELECT {kwargs!s}") from e + try: + schema = "" + sc = model.Meta.schema + if sc: + schema = f"{sc}." + table = f"{schema}{model.Meta.name}" + except AttributeError: + table = model.__name__ + fields = model.columns(model) + if _filter is None and args: + _filter = args[0] + cols = [] + source = [] + new_cond = {} + n = 1 + for name, field in fields.items(): + try: + val = kwargs[name] + except (KeyError, AttributeError): + continue + ## getting the value of column: + value = self._get_value(field, val) + source.append(value) + if name in _filter: + new_cond[name] = value + cols.append("{} = {}".format(name, "?".format(n))) # pylint: disable=C0209 + n += 1 + try: + condition = self._where(fields, **_filter) + _delete = f"DELETE FROM {table} {condition}" + self._logger.debug(f"DELETE: {_delete}") + stmt = await self.get_sentence(_delete) + result = self._connection.excute(stmt, source) + print(f"DELETE {result}: {_filter!s}") + return f'DELETED: {_filter}' + except Exception as err: + raise DriverError(message=f"Error on DELETE over table {model.Meta.name}: {err!s}") from err diff --git a/asyncdb/drivers/scylladb.py b/asyncdb/drivers/scylladb.py index fea7ef2c..de52985a 100644 --- a/asyncdb/drivers/scylladb.py +++ b/asyncdb/drivers/scylladb.py @@ -83,7 +83,7 @@ def __init__( ): self.hosts: list = [] self.application_name = os.getenv("APP_NAME", "NAV") - self._enable_shard_awareness = kwargs.pop("shard_awareness", True) + self._enable_shard_awareness = kwargs.pop("shard_awareness", False) self._test_query = "SELECT release_version FROM system.local" self._query_raw = "SELECT {fields} FROM {table} {where_cond}" self._cluster = None diff --git a/asyncdb/version.py b/asyncdb/version.py index a6ffe3f6..52042a3e 100644 --- a/asyncdb/version.py +++ b/asyncdb/version.py @@ -3,7 +3,7 @@ __title__ = "asyncdb" __description__ = "Library for Asynchronous data source connections \ Collection of asyncio drivers." -__version__ = "2.7.13" +__version__ = "2.7.14" __author__ = "Jesus Lara" __author_email__ = "jesuslarag@gmail.com" __license__ = "BSD" diff --git a/examples/test_bigquery_model.py b/examples/test_bigquery_model.py new file mode 100644 index 00000000..125d85c2 --- /dev/null +++ b/examples/test_bigquery_model.py @@ -0,0 +1,171 @@ +import asyncio +from pathlib import Path +from asyncdb import AsyncDB +from asyncdb.models import Model, Field + +DRIVER='bigquery' +# create a pool with parameters +PARAMS = { + "credentials": "~/proyectos/navigator/asyncdb/env/key.json", + "project_id": "unique-decker-385015" +} + +BASE_DIR = Path(__file__).resolve().parent.parent +sql_file = BASE_DIR.joinpath('docs', 'countries_flags.bigquery.sql') + +table_schema = """ +CREATE TABLE IF NOT EXISTS library.country_flags ( + country STRING, + cyclist_name STRING, + flag INT64 +); +""" + +class CountryFlag(Model): + country: str = Field(primary_key=True) + cyclist_name: str = Field(required=True, primary_key=True) + flag: int = Field(primary_key=True) + + class Meta: + name = 'country_flags' + schema = 'library' + driver = 'bigquery' + strict = True + +async def test_connection(): + bq = AsyncDB('bigquery', params=PARAMS) + await bq.connection() + print( + f"Connected: {bq.is_connected()}" + ) + print('TEST ', await bq.test_connection()) + query = """ + SELECT corpus AS title, COUNT(word) AS unique_words + FROM `bigquery-public-data.samples.shakespeare` + GROUP BY title + ORDER BY unique_words + DESC LIMIT 10 + """ + results, error = await bq.query(query) + for row in results: + title = row['title'] + unique_words = row['unique_words'] + print(f'{title:<20} | {unique_words}') + +async def start_test(): + db = AsyncDB(DRIVER, params=PARAMS) + async with await db.connection() as conn: + print('CONNECTED: ', conn.is_connected() is True) + ### Sample: create a dataset, a table and load a dataset + dataset = await conn.create_keyspace('library') + print('Dataset created: ', dataset) + # CREATE TABLE: + result, error = await conn.execute( + table_schema + ) + print('CREATE > ', result, error) + + if sql_file.exists(): + print('READING FILE: ', sql_file) + # read cql_file into a list of sentences + sentences = [] + with open(sql_file, 'r') as file: + sentences = file.readlines() + chunk_size = 5000 + chunks = [sentences[i:i+chunk_size] for i in range(0, len(sentences), chunk_size)] + for chunk in chunks: + result, error = await conn.execute_many(chunk) + # Count models: + count, error = await conn.query('SELECT COUNT(*) FROM unique-decker-385015.library.country_flags;') + for c in count: + print(c) + + +async def finish_test(): + db = AsyncDB(DRIVER, params=PARAMS) + async with await db.connection() as conn: + await conn.execute('DROP TABLE library.country_flags;') + await conn.drop_keyspace('library') + +async def test_operations(): + db = AsyncDB(DRIVER, params=PARAMS) + async with await db.connection() as conn: + print('CONNECTED: ', conn.is_connected() is True) + # using row factories + result, _ = await conn.query('SELECT * from library.country_flags LIMIT 10000', factory='pandas') + print(result.result) + result, _ = await conn.query('SELECT * from library.country_flags LIMIT 10000', factory='recordset') + print(result.result) + # output formats + db.output_format('json') # change output format to json + result, _ = await conn.query('SELECT * from library.country_flags LIMIT 1') + print(result) + +# async def test_model(): +# db = AsyncDB(DRIVER, params=ARGS) +# async with await db.connection() as conn: +# await conn.use('library') # set database to work +# # Set the model to use the connection +# CountryFlag.Meta.connection = conn +# filter = { +# "country": "Venezuela", +# "cyclist_name": "Marty", +# } +# result = await CountryFlag.filter(**filter) +# for res in result: +# print('RESULT > ', res) +# # Get one single record: +# katie = await CountryFlag.get( +# country="Anguilla", +# cyclist_name="Kathie", +# flag=7 +# ) +# print('RESULT > ', katie) +# # Insert a new record: +# new_record = CountryFlag( +# country="Venezuela", +# cyclist_name="Jesus", +# flag=233 +# ) +# await new_record.insert() +# print('INSERT > ', new_record) +# # Delete the record: +# result = await new_record.delete() +# print('DELETED > ', result) +# # Update the record: +# katie.flag = 233 +# await katie.update() +# print('UPDATED > ', katie) +# # Batch operation: +# brazil = await CountryFlag.filter(country="Brazil") +# for b in brazil: +# print(b) +# # Delete all records: +# result = await CountryFlag.remove(country="Brazil") +# print('REMOVED > ', result) +# # get all records: +# all_records = await CountryFlag.all() +# print('ALL RECORDS > ', len(all_records)) + +if __name__ == '__main__': + try: + loop = asyncio.get_event_loop() + loop.run_until_complete( + test_connection() + ) + loop.run_until_complete( + start_test() + ) + loop.run_until_complete( + test_operations() + ) + # loop.run_until_complete( + # test_model() + # ) + except Exception as err: + print('Error: ', err) + finally: + loop.run_until_complete( + finish_test() + ) + loop.close() From edf6ef055b096ef0b64febee79bd75e03e8d46e8 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 8 Jul 2024 13:58:49 +0200 Subject: [PATCH 2/6] method _all_ for bigquery models --- asyncdb/drivers/bigquery.py | 29 ++++++++++++++++++----------- asyncdb/drivers/pg.py | 7 ++++++- asyncdb/meta/record.py | 1 + asyncdb/meta/recordset.py | 7 ++++++- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/asyncdb/drivers/bigquery.py b/asyncdb/drivers/bigquery.py index 031c5a4e..703ac909 100644 --- a/asyncdb/drivers/bigquery.py +++ b/asyncdb/drivers/bigquery.py @@ -3,6 +3,7 @@ from collections.abc import Iterable import io from pathlib import Path, PurePath +from dataclasses import is_dataclass import asyncio import aiofiles import pandas_gbq @@ -209,6 +210,7 @@ async def query(self, sentence: str, **kwargs): await self.connection() await self.valid_operation(sentence) self.start_timing() + self.output_format(kwargs.pop('factory', 'native')) error = None result = None try: @@ -274,8 +276,8 @@ async def fetch_one(self, query, *args): async def write( self, - table_id: str, data, + table_id: str = None, dataset_id: str = None, use_streams: bool = False, use_pandas: bool = True, # by default using BigQuery @@ -331,10 +333,14 @@ async def write( f"Errors occurred while inserting rows: {errors}" ) else: + job_config = bq.LoadJobConfig( + source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON, + ) job = await self._thread_func( self._connection.load_table_from_json, - table, data, + table, + job_config=job_config, **kwargs ) loop = asyncio.get_event_loop() @@ -344,7 +350,7 @@ async def write( else: self._logger.info(f"Loaded {len(data)} rows into {table_id}") self._logger.info( - f"Inserted rows into {dataset_id}.{table_id}" + f"Inserted rows into {dataset_id}.{table_id}: {len(data)} rows" ) # return Job object return job @@ -532,7 +538,9 @@ async def multi_query(self, queries: list): results = await asyncio.gather(*tasks) # Execute tasks concurrently and gather results return results + ############################## ## Model Logic: + ############################## async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613 """ insert a row from model. @@ -566,9 +574,6 @@ async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613 except AttributeError: required = False pk = self._get_attribute(field, value, attr="primary_key") - # if pk is True and value is None: - # if "db_default" in field.metadata: - # continue if pk is True and value is None and "db_default" in field.metadata: continue if required is False and value is None or value == "None": @@ -920,22 +925,24 @@ async def _all_(self, _model: Model, *args, **kwargs): # pylint: disable=W0613 """ try: schema = "" - # sc = _model.Meta.schema if sc := _model.Meta.schema: schema = f"{sc}." table = f"{schema}{_model.Meta.name}" except AttributeError: - table = _model.__name__ + table = f"{_model.Meta.schema}.{_model.Meta.name}" if "fields" in kwargs: columns = ",".join(kwargs["fields"]) else: columns = "*" _all = f"SELECT {columns} FROM {table}" try: - smt = SimpleStatement(_all) - return self._connection.execute(smt) + job = self._connection.query(_all, **kwargs) + result = job.result() # Waits for the query to finish + return [row for row in result] except Exception as e: - raise DriverError(f"Error: Model All over {table}: {e}") from e + raise DriverError( + f"Error: Model All over {table}: {e}" + ) from e async def _remove_(self, _model: Model, **kwargs): """ diff --git a/asyncdb/drivers/pg.py b/asyncdb/drivers/pg.py index a1db6231..8abd6500 100644 --- a/asyncdb/drivers/pg.py +++ b/asyncdb/drivers/pg.py @@ -738,7 +738,12 @@ async def query(self, sentence: Union[str, Any], *args, **kwargs): return [None, "Data was not found"] except RuntimeError as err: error = f"Query Error: {err}" - except (InvalidSQLStatementNameError, PostgresSyntaxError, UndefinedColumnError, UndefinedTableError) as err: + except ( + InvalidSQLStatementNameError, + PostgresSyntaxError, + UndefinedColumnError, + UndefinedTableError + ) as err: error = f"Sentence Error: {err}" except PostgresError as err: error = f"Postgres Error: {err}" diff --git a/asyncdb/meta/record.py b/asyncdb/meta/record.py index 6df14c61..748472cc 100644 --- a/asyncdb/meta/record.py +++ b/asyncdb/meta/record.py @@ -5,6 +5,7 @@ """ from collections.abc import MutableMapping, Iterator from typing import Any, Union +from google.cloud.bigquery import Row class Record(MutableMapping): diff --git a/asyncdb/meta/recordset.py b/asyncdb/meta/recordset.py index 1f379849..e7101db9 100644 --- a/asyncdb/meta/recordset.py +++ b/asyncdb/meta/recordset.py @@ -5,6 +5,7 @@ """ from collections.abc import Sequence, Iterator from typing import Any, Union +from google.cloud import bigquery from .record import Record @@ -31,7 +32,11 @@ def get_result(self) -> Any: def from_result(cls, result: Iterator) -> "Recordset": cols = [] try: - if hasattr(result, "one"): # Cassandra Resulset + if isinstance(result, bigquery.table.RowIterator): + rows_list = [row for row in result] + result = [{key: value for key, value in row.items()} for row in rows_list] + cols = list(result[0].keys()) + elif hasattr(result, "one"): # Cassandra Resulset if callable(result.one): cols = result.one().keys result = list(result) From 44e476795cabe1dfe40e29cc431083054b0f9038 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 8 Jul 2024 17:00:14 +0200 Subject: [PATCH 3/6] upgrade version of bigquery --- asyncdb/drivers/bigquery.py | 120 +++++++++++++++--------- examples/test_bigquery_model.py | 156 ++++++++++++++++++++------------ setup.py | 12 +-- 3 files changed, 177 insertions(+), 111 deletions(-) diff --git a/asyncdb/drivers/bigquery.py b/asyncdb/drivers/bigquery.py index 703ac909..ed8da897 100644 --- a/asyncdb/drivers/bigquery.py +++ b/asyncdb/drivers/bigquery.py @@ -1,7 +1,8 @@ import os from typing import Any, Union from collections.abc import Iterable -import io +import uuid +from enum import Enum from pathlib import Path, PurePath from dataclasses import is_dataclass import asyncio @@ -16,7 +17,8 @@ from ..exceptions import DriverError from .sql import SQLDriver from ..interfaces import ModelBackend -from ..models import Model, Field +from ..models import Model +from ..utils.types import Entity class bigquery(SQLDriver, ModelBackend): @@ -541,18 +543,17 @@ async def multi_query(self, queries: list): ############################## ## Model Logic: ############################## + def get_table_ref(self, schema: str, table: str): + """Returns the referencie of a BQ Table. + """ + dataset_ref = bq.DatasetReference(self._connection.project, schema) + table_ref = dataset_ref.table(table) + return self._connection.get_table(table_ref) + async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613 """ insert a row from model. """ - try: - schema = "" - sc = _model.Meta.schema - if sc: - schema = f"{sc}." - table = f"{schema}{_model.Meta.name}" - except AttributeError: - table = _model.__name__ cols = [] columns = [] source = {} @@ -565,6 +566,7 @@ async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613 except AttributeError: continue ## getting the value of column: + value = self._get_value(field, val) column = field.name columns.append(column) @@ -605,27 +607,45 @@ async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613 value = str(value) # convert to string, for now elif isinstance(value, Enum): value = value.value - source[column] = value + source[column] = val cols.append(column) n += 1 if pk := self._get_attribute(field, value, attr="primary_key"): _filter[column] = pk try: - values = ", ".join([f":{a}" for a in cols]) # pylint: disable=C0209 - cols = ",".join(cols) - insert = f"INSERT INTO {table}({cols}) VALUES({values}) IF NOT EXISTS;" - self._logger.debug(f"INSERT: {insert}") - stmt = self._connection.prepare(insert) - result = self._connection.execute(stmt, source) - if result.was_applied: - # get the row inserted again: - condition = " AND ".join( - [f"{key} = :{key}" for key in _filter] - ) - _select_stmt = f"SELECT * FROM {table} WHERE {condition}" - self._logger.debug(f"SELECT: {_select_stmt}") - stmt = self._connection.prepare(_select_stmt) - result = self._connection.execute(stmt, _filter).one() + table = self.get_table_ref(_model.Meta.schema, _model.Meta.name) + print('INSERT > ', table, source, type(source)) + for k,v in source.items(): + print(f"{k} = {v}", type(v)) + # job = self._connection.insert_rows( + # table, + # [source], + # ) + # print('JOB > ', job) + dataset_ref = self._connection.dataset(_model.Meta.schema) + table_ref = dataset_ref.table(_model.Meta.name) + table = bq.Table(table_ref) + + job_config = bq.LoadJobConfig( + source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + job = await self._thread_func( + self._connection.load_table_from_json, + [source], + table, + job_config=job_config + ) + row = job.result() + print('ROW > ', row) + # get the row inserted again: + condition = " AND ".join( + [f"{key} = :{key}" for key in _filter] + ) + _select_stmt = f"SELECT * FROM {table} WHERE {condition}" + self._logger.debug(f"SELECT: {_select_stmt}") + job = self._connection.query(_select_stmt) + result = job.result() + result = next(iter(result)) if result: _model.reset_values() for f, val in result.items(): @@ -790,7 +810,7 @@ async def _fetch_(self, _model: Model, *args, **kwargs): schema = f"{sc}." table = f"{schema}{_model.Meta.name}" except AttributeError: - table = _model.__name__ + table = f"{_model.Meta.schema}.{_model.Meta.name}" fields = _model.columns() _filter = {} for name, field in fields.items(): @@ -806,10 +826,13 @@ async def _fetch_(self, _model: Model, *args, **kwargs): condition = self._where(fields, **_filter) _get = f"SELECT * FROM {table} {condition}" try: - smt = SimpleStatement(_get) - return self._connection.execute(smt).one() + job = self._connection.query(_get) + result = job.result() # Waits for the query to finish + return next(iter(result)) except Exception as e: - raise DriverError(f"Error: Model Fetch over {table}: {e}") from e + raise DriverError( + f"Error: Model Fetch over {table}: {e}" + ) from e async def _filter_(self, _model: Model, *args, **kwargs): """ @@ -822,7 +845,7 @@ async def _filter_(self, _model: Model, *args, **kwargs): schema = f"{sc}." table = f"{schema}{_model.Meta.name}" except AttributeError: - table = _model.__name__ + table = f"{_model.Meta.schema}.{_model.Meta.name}" fields = _model.columns(_model) _filter = {} if args: @@ -842,12 +865,13 @@ async def _filter_(self, _model: Model, *args, **kwargs): condition = self._where(fields, **_filter) _get = f"SELECT {columns} FROM {table} {condition}" try: - stmt = SimpleStatement(_get) - fut = self._connection.execute_async(stmt) - result = fut.result() - return result + job = self._connection.query(_get) + result = job.result() # Waits for the query to finish + return [row for row in result] except Exception as e: - raise DriverError(f"Error: Model GET over {table}: {e}") from e + raise DriverError( + f"Error: Model GET over {table}: {e}" + ) from e async def _select_(self, *args, **kwargs): """ @@ -856,7 +880,9 @@ async def _select_(self, *args, **kwargs): try: model = kwargs["_model"] except KeyError as e: - raise DriverError(f"Missing Model for SELECT {kwargs!s}") from e + raise DriverError( + f"Missing Model for SELECT {kwargs!s}" + ) from e try: schema = "" sc = model.Meta.schema @@ -864,7 +890,7 @@ async def _select_(self, *args, **kwargs): schema = f"{sc}." table = f"{schema}{model.Meta.name}" except AttributeError: - table = model.__name__ + table = f"{model.Meta.schema}.{model.Meta.name}" if args: condition = "{}".join(args) else: @@ -875,10 +901,13 @@ async def _select_(self, *args, **kwargs): columns = "*" _get = f"SELECT {columns} FROM {table} {condition}" try: - smt = SimpleStatement(_get) - return self._connection.execute(smt) + job = self._connection.query(_get) + result = job.result() # Waits for the query to finish + return [row for row in result] except Exception as e: - raise DriverError(f"Error: Model SELECT over {table}: {e}") from e + raise DriverError( + f"Error: Model GET over {table}: {e}" + ) from e async def _get_(self, _model: Model, *args, **kwargs): """ @@ -891,7 +920,7 @@ async def _get_(self, _model: Model, *args, **kwargs): schema = f"{sc}." table = f"{schema}{_model.Meta.name}" except AttributeError: - table = _model.__name__ + table = f"{_model.Meta.schema}.{_model.Meta.name}" fields = _model.columns(_model) _filter = {} if args: @@ -910,10 +939,11 @@ async def _get_(self, _model: Model, *args, **kwargs): _filter[name] = value condition = self._where(fields, **_filter) _get = f"SELECT {columns} FROM {table} {condition}" - print('SELECT ', _get) + print('SELECT :: ', _get) try: - smt = SimpleStatement(_get) - return self._connection.execute(smt).one() + job = self._connection.query(_get) + result = job.result() # Waits for the query to finish + return next(iter(result)) except Exception as e: raise DriverError( f"Error: Model GET over {table}: {e}" diff --git a/examples/test_bigquery_model.py b/examples/test_bigquery_model.py index 125d85c2..a1118008 100644 --- a/examples/test_bigquery_model.py +++ b/examples/test_bigquery_model.py @@ -1,4 +1,5 @@ import asyncio +import re from pathlib import Path from asyncdb import AsyncDB from asyncdb.models import Model, Field @@ -21,6 +22,20 @@ ); """ +def parse_inserts(statements): + rows_to_insert = [] + + for statement in statements: + match = re.search(r"INSERT INTO \w+\.\w+ \(([\w, ]+)\) VALUES \((.+)\);", statement) + if match: + columns = [col.strip() for col in match.group(1).split(",")] + values = [val.strip() for val in match.group(2).split(",")] + # Remove quotes and convert to appropriate types + values = [int(val) if val.isdigit() else val.strip("'") for val in values] + row_dict = dict(zip(columns, values)) + rows_to_insert.append(row_dict) + return rows_to_insert + class CountryFlag(Model): country: str = Field(primary_key=True) cyclist_name: str = Field(required=True, primary_key=True) @@ -64,17 +79,26 @@ async def start_test(): table_schema ) print('CREATE > ', result, error) - if sql_file.exists(): print('READING FILE: ', sql_file) # read cql_file into a list of sentences sentences = [] with open(sql_file, 'r') as file: sentences = file.readlines() - chunk_size = 5000 + chunk_size = 1000 chunks = [sentences[i:i+chunk_size] for i in range(0, len(sentences), chunk_size)] for chunk in chunks: - result, error = await conn.execute_many(chunk) + # Parse the inserts + data = parse_inserts(chunk) + print('DATA > ', len(data), data[0], type(data[0])) + await conn.write( + data, + table_id='country_flags', + dataset_id='library', + use_streams=False, + use_pandas=False + ) + break # Count models: count, error = await conn.query('SELECT COUNT(*) FROM unique-decker-385015.library.country_flags;') for c in count: @@ -90,62 +114,74 @@ async def finish_test(): async def test_operations(): db = AsyncDB(DRIVER, params=PARAMS) async with await db.connection() as conn: - print('CONNECTED: ', conn.is_connected() is True) + print('TEST CONNECTED: ', conn.is_connected() is True) # using row factories - result, _ = await conn.query('SELECT * from library.country_flags LIMIT 10000', factory='pandas') - print(result.result) - result, _ = await conn.query('SELECT * from library.country_flags LIMIT 10000', factory='recordset') - print(result.result) + result, err = await conn.query( + 'SELECT * from library.country_flags LIMIT 100', + factory='pandas' + ) + print(result, 'Error: ', err) + result, err = await conn.query( + 'SELECT * from library.country_flags LIMIT 100', + factory='recordset' + ) + print(result, len(result), 'Error: ', err) # output formats db.output_format('json') # change output format to json - result, _ = await conn.query('SELECT * from library.country_flags LIMIT 1') - print(result) - -# async def test_model(): -# db = AsyncDB(DRIVER, params=ARGS) -# async with await db.connection() as conn: -# await conn.use('library') # set database to work -# # Set the model to use the connection -# CountryFlag.Meta.connection = conn -# filter = { -# "country": "Venezuela", -# "cyclist_name": "Marty", -# } -# result = await CountryFlag.filter(**filter) -# for res in result: -# print('RESULT > ', res) -# # Get one single record: -# katie = await CountryFlag.get( -# country="Anguilla", -# cyclist_name="Kathie", -# flag=7 -# ) -# print('RESULT > ', katie) -# # Insert a new record: -# new_record = CountryFlag( -# country="Venezuela", -# cyclist_name="Jesus", -# flag=233 -# ) -# await new_record.insert() -# print('INSERT > ', new_record) -# # Delete the record: -# result = await new_record.delete() -# print('DELETED > ', result) -# # Update the record: -# katie.flag = 233 -# await katie.update() -# print('UPDATED > ', katie) -# # Batch operation: -# brazil = await CountryFlag.filter(country="Brazil") -# for b in brazil: -# print(b) -# # Delete all records: -# result = await CountryFlag.remove(country="Brazil") -# print('REMOVED > ', result) -# # get all records: -# all_records = await CountryFlag.all() -# print('ALL RECORDS > ', len(all_records)) + result, err = await conn.query('SELECT * from library.country_flags LIMIT 1') + print(result, 'Error: ', err) + for row in result: + print(row) + +async def test_model(): + db = AsyncDB(DRIVER, params=PARAMS) + async with await db.connection() as conn: + # Set the model to use the connection + CountryFlag.Meta.connection = conn + + # get all records: + all_records = await CountryFlag.all() + print('ALL RECORDS > ', len(all_records)) + + # Get one single record: + Kissee = await CountryFlag.get( + country="Algeria", + cyclist_name="Kissee", + flag=3 + ) + print('Get Kissee > ', Kissee) + + filter = { + "country": "Austria" + } + result = await CountryFlag.filter(**filter) + for res in result: + print('RESULT > ', res) + + # Insert a new record: + new_record = CountryFlag( + country="Venezuela", + cyclist_name="Jesus", + flag=233 + ) + await new_record.insert() + print('INSERT > ', new_record) + + # # Delete the record: + # result = await new_record.delete() + # print('DELETED > ', result) + # # Update the record: + # katie.flag = 233 + # await katie.update() + # print('UPDATED > ', katie) + # # Batch operation: + # brazil = await CountryFlag.filter(country="Brazil") + # for b in brazil: + # print(b) + # # Delete all records: + # result = await CountryFlag.remove(country="Brazil") + # print('REMOVED > ', result) + if __name__ == '__main__': try: @@ -156,12 +192,12 @@ async def test_operations(): loop.run_until_complete( start_test() ) - loop.run_until_complete( - test_operations() - ) # loop.run_until_complete( - # test_model() + # test_operations() # ) + loop.run_until_complete( + test_model() + ) except Exception as err: print('Error: ', err) finally: diff --git a/setup.py b/setup.py index 2635d9d8..6fc70f79 100644 --- a/setup.py +++ b/setup.py @@ -186,9 +186,9 @@ def readme(): "aioboto3==12.0.0" ], "bigquery": [ - "google-cloud-bigquery==3.13.0", - "pandas-gbq==0.22.0", - "google-cloud-storage==2.16.0" + "google-cloud-bigquery==3.25.0", + "pandas-gbq==0.23.1", + "google-cloud-storage==2.17.0" ], "cassandra": [ "cassandra-driver==3.29.1", @@ -275,9 +275,9 @@ def readme(): "botocore==1.31.64", "aiobotocore==2.7.0", "aioboto3==12.0.0", - "google-cloud-bigquery==3.13.0", - "google-cloud-storage==2.16.0", - "pandas-gbq==0.22.0", + "google-cloud-bigquery==3.25.0", + "google-cloud-storage==2.17.0", + "pandas-gbq==0.23.1", "tqdm==4.66.1" ] }, From 41c68cb3c0109c9d65a8c5ef2597f2a707051aff Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 8 Jul 2024 19:51:14 +0200 Subject: [PATCH 4/6] lower timeout for connections on scylladb --- asyncdb/drivers/scylladb.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/asyncdb/drivers/scylladb.py b/asyncdb/drivers/scylladb.py index de52985a..4fc0f55e 100644 --- a/asyncdb/drivers/scylladb.py +++ b/asyncdb/drivers/scylladb.py @@ -26,6 +26,7 @@ DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy, DowngradingConsistencyRetryPolicy, + ConstantReconnectionPolicy, TokenAwarePolicy, RoundRobinPolicy ) @@ -325,10 +326,14 @@ async def connect(self, keyspace=None): "compression": True, "connection_class": conn_class, "protocol_version": self._protocol, - "connect_timeout": self._timeout, "idle_heartbeat_interval": self.heartbeat_interval, "ssl_options": ssl_opts, "executor_threads": 4, + "reconnection_policy": ConstantReconnectionPolicy( + delay=5.0, + max_attempts=100 + ), + "connect_timeout": 10 } auth_provider = None if self._auth: From 75304aa7430460199e14b1c5bbc82c5de11ba861 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 8 Jul 2024 19:51:33 +0200 Subject: [PATCH 5/6] wip: bigqueru model --- examples/test_scylla.py | 152 +++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 65 deletions(-) diff --git a/examples/test_scylla.py b/examples/test_scylla.py index 2fe6dee8..a9173b36 100644 --- a/examples/test_scylla.py +++ b/examples/test_scylla.py @@ -11,8 +11,20 @@ "password": 'navigator' } +ARGS = { + "host": "10.10.10.93", + "port": "9042", + "username": '', + "password": '' +} + async def test_connection(): - db = AsyncDB(DRIVER, params=ARGS) + db = AsyncDB( + DRIVER, + params=ARGS, + shard_awareness=True, + whitelist=["10.10.10.93"] + ) async with await db.connection() as conn: print('CONNECTED: ', conn.is_connected() is True) result, error = await conn.test_connection() @@ -21,7 +33,12 @@ async def test_connection(): async def test_async(): ARGS['driver'] = 'async' - db = AsyncDB(DRIVER, params=ARGS) + db = AsyncDB( + DRIVER, + params=ARGS, + shard_awareness=True, + whitelist=["10.10.10.93"] + ) async with await db.connection() as conn: print('CONNECTED: ', conn.is_connected() is True) result, error = await conn.test_connection() @@ -29,71 +46,76 @@ async def test_async(): print(type(result) == list) async def test_operations(): - db = AsyncDB(DRIVER, params=ARGS) - await db.connection() - print('CONNECTED: ', db.is_connected() is True) - result, error = await db.test_connection() - print(result, error) - print(type(result) == list) - await db.create_keyspace('navigator') - await db.use('navigator') # set database to work - # creation and insertion: - result, error = await db.execute( - "CREATE TABLE IF NOT EXISTS tests (id int, name text, PRIMARY KEY(id))" + db = AsyncDB( + DRIVER, + params=ARGS, + shard_awareness=True, + whitelist=["10.10.10.93"] ) - print('CREATE > ', result, error) - ql = "INSERT INTO tests (id, name) VALUES(?, ?)" - # prepare the statement: - prepared = await db.prepare(ql) - print(": prepared statement: ", prepared) - print(": Executing Insert of many entries: ") - result, error = await db.execute(prepared, (1, "abc")) - result, error = await db.execute(prepared, (2, "def")) - result, error = await db.execute(prepared, (3, "ghi")) - result, error = await db.execute(prepared, (4, "jkl")) - print('Execute a Many insert:: ') - examples = [(i, ''.join(random.choices(string.ascii_lowercase, k=4))) for i in range(5, 10005)] - result, error = await db.execute_many(prepared, examples) - print(result, error) - print('first select:') - result, error = await db.query("SELECT * FROM tests") - for row in result: - print(row) - # getting column info from Table: - columns = await db.column_info(table='tests', schema='navigator') - print('COLUMNS ', columns) - # using row factories - result, error = await db.query('SELECT * from tests LIMIT 10000', factory='pandas') - print(result.result) - result, error = await db.query('SELECT * from tests LIMIT 10000', factory='recordset') - print(result.result) - # output formats - db.output_format('json') # change output format to json - result, error = await db.query('SELECT * from tests LIMIT 10000') - print(result) + async with await db.connection() as conn: + print('CONNECTED: ', conn.is_connected() is True) + result, error = await conn.test_connection() + print(result, error) + print(type(result) == list) + await conn.create_keyspace('navigator') + await conn.use('navigator') # set database to work + # creation and insertion: + result, error = await db.execute( + "CREATE TABLE IF NOT EXISTS tests (id int, name text, PRIMARY KEY(id))" + ) + print('CREATE > ', result, error) + ql = "INSERT INTO tests (id, name) VALUES(?, ?)" + # prepare the statement: + prepared = await conn.prepare(ql) + print(": prepared statement: ", prepared) + print(": Executing Insert of many entries: ") + result, error = await conn.execute(prepared, (1, "abc")) + result, error = await conn.execute(prepared, (2, "def")) + result, error = await conn.execute(prepared, (3, "ghi")) + result, error = await conn.execute(prepared, (4, "jkl")) + print('Execute a Many insert:: ') + examples = [(i, ''.join(random.choices(string.ascii_lowercase, k=4))) for i in range(5, 10005)] + result, error = await conn.execute_many(prepared, examples) + print(result, error) + print('first select:') + result, error = await conn.query("SELECT * FROM tests") + for row in result: + print(row) + # getting column info from Table: + columns = await db.column_info(table='tests', schema='navigator') + print('COLUMNS ', columns) + # using row factories + result, error = await conn.query('SELECT * from tests LIMIT 10000', factory='pandas') + print(result.result) + result, error = await conn.query('SELECT * from tests LIMIT 10000', factory='recordset') + print(result.result) + # output formats + conn.output_format('json') # change output format to json + result, error = await conn.query('SELECT * from tests LIMIT 10000') + print(result) - db.output_format('pandas') # change output format to pandas - result, error = await db.query('SELECT * from tests LIMIT 10000') - print(result) - db.output_format('polars') # change output format to iter generator - result, error = await db.query('SELECT * from tests LIMIT 10000') - print(result) - # change output format to iter generator - db.output_format('dt') - # TODO: error when a python list is on a column - result, error = await db.query('SELECT * from tests LIMIT 10000') - print(result) - print(type(result)) - db.output_format('record') # change output format to iter generator - result, error = await db.query('SELECT * from tests LIMIT 10000') - print(type(result)) - # testing Recordset Object - db.output_format('recordset') # change output format to ResultSet - result, error = await db.query('SELECT * from tests LIMIT 10000') - for row in result: - print(row) - await db.execute('DROP TABLE tests') - await db.close() + conn.output_format('pandas') # change output format to pandas + result, error = await conn.query('SELECT * from tests LIMIT 10000') + print(result) + conn.output_format('polars') # change output format to iter generator + result, error = await conn.query('SELECT * from tests LIMIT 10000') + print(result) + # change output format to iter generator + conn.output_format('dt') + # TODO: error when a python list is on a column + result, error = await conn.query('SELECT * from tests LIMIT 10000') + print(result) + print(type(result)) + conn.output_format('record') # change output format to iter generator + result, error = await conn.query('SELECT * from tests LIMIT 10000') + print(type(result)) + # testing Recordset Object + conn.output_format('recordset') # change output format to ResultSet + result, error = await conn.query('SELECT * from tests LIMIT 10000') + for row in result: + print(row) + await conn.execute('DROP TABLE tests') + await conn.close() if __name__ == '__main__': From 8b5e9b6c014f9ecc0c3556bb7dc0e4e321897d63 Mon Sep 17 00:00:00 2001 From: Jesus Lara Date: Mon, 8 Jul 2024 19:55:22 +0200 Subject: [PATCH 6/6] wip: bigquery model --- asyncdb/drivers/bigquery.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/asyncdb/drivers/bigquery.py b/asyncdb/drivers/bigquery.py index ed8da897..cafad611 100644 --- a/asyncdb/drivers/bigquery.py +++ b/asyncdb/drivers/bigquery.py @@ -617,26 +617,14 @@ async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613 print('INSERT > ', table, source, type(source)) for k,v in source.items(): print(f"{k} = {v}", type(v)) - # job = self._connection.insert_rows( - # table, - # [source], - # ) - # print('JOB > ', job) dataset_ref = self._connection.dataset(_model.Meta.schema) table_ref = dataset_ref.table(_model.Meta.name) table = bq.Table(table_ref) - - job_config = bq.LoadJobConfig( - source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON, - ) - job = await self._thread_func( - self._connection.load_table_from_json, - [source], + errors = self._connection.insert_rows_json( table, - job_config=job_config + [source] ) - row = job.result() - print('ROW > ', row) + print('ROW > ', errors) # get the row inserted again: condition = " AND ".join( [f"{key} = :{key}" for key in _filter]