Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New MongoDB/DocumentDB driver with Motor #1374

Merged
merged 3 commits into from
Jan 10, 2025
Merged

Conversation

phenobarbital
Copy link
Owner

@phenobarbital phenobarbital commented Jan 10, 2025

Summary by Sourcery

Implement a new MongoDB/DocumentDB driver using Motor.

New Features:

  • Introduce a new MongoDB/DocumentDB driver leveraging the Motor library for asynchronous operations.

Tests:

  • Add comprehensive tests for the new MongoDB/DocumentDB driver.

Copy link
Contributor

sourcery-ai bot commented Jan 10, 2025

Reviewer's Guide by Sourcery

This pull request introduces a new MongoDB/DocumentDB driver using the Motor library. It replaces the existing mongo driver with a more robust and feature-rich implementation. The new driver supports various MongoDB deployment types like standalone MongoDB, MongoDB Atlas, and DocumentDB. It also includes features like connection timeout, database selection, and bulk operations.

Sequence diagram for MongoDB connection and operation flow

sequenceDiagram
    participant Client
    participant MongoDriver as mongo
    participant MongoDB

    Client->>MongoDriver: __init__(dsn, params)
    MongoDriver->>MongoDriver: _construct_dsn(params)
    Client->>MongoDriver: connection()
    MongoDriver->>MongoDB: connect with Motor client
    MongoDB-->>MongoDriver: connection established
    MongoDriver-->>Client: driver instance

    Client->>MongoDriver: use(database)
    MongoDriver->>MongoDB: select database
    MongoDB-->>MongoDriver: database selected
    MongoDriver-->>Client: database instance

    Client->>MongoDriver: execute(collection, operation)
    MongoDriver->>MongoDriver: _select_database()
    MongoDriver->>MongoDB: perform operation
    MongoDB-->>MongoDriver: operation result
    MongoDriver-->>Client: result
Loading

Class diagram for the updated MongoDB driver

classDiagram
    class mongo {
        +str _provider
        +str _syntax
        +str _dsn_template
        +AsyncIOMotorClient _connection
        +AsyncIOMotorDatabase _database
        +str _database_name
        +list _databases
        +int _timeout
        +__init__(dsn, loop, params, kwargs)
        +_construct_dsn(params)
        +_select_database()
        +connection()
        +close()
        +test_connection(use_ping)
        +use(database)
        +execute(collection_name, operation, args, kwargs)
        +execute_many(collection_name, operation, documents)
        +query(collection_name, filter, args, kwargs)
        +write(data, collection, database, use_pandas, if_exists)
        +truncate_table(collection_name)
        +delete(collection_name, filter, many)
        +drop_collection(collection_name)
        +drop_database(database_name)
    }
    class BaseDriver {
        <<interface>>
        +connection()
        +close()
    }
    mongo --|> BaseDriver : implements
Loading

File-Level Changes

Change Details Files
Implemented a new MongoDB driver using Motor.
  • Replaced the old mongo driver with a new implementation based on Motor.
  • Added support for different MongoDB connection types (standalone, Atlas, DocumentDB).
  • Introduced a connection timeout mechanism.
  • Improved database selection logic.
  • Added support for bulk write operations with upsert functionality.
  • Implemented truncate_table, drop_collection, and drop_database methods.
  • Updated tests to cover the new driver's functionality.
asyncdb/drivers/mongo.py
tests/test_mongo.py
examples/test_mongo.py
Updated DSN handling and connection logic.
  • Introduced a _dsn_template for constructing DSN strings.
  • Added _construct_dsn method to dynamically build DSN based on parameters.
  • Modified connection method to use the new DSN handling and timeout.
  • Added is_connected method to check the connection status.
asyncdb/drivers/mongo.py
Improved data handling and processing.
  • Enhanced write method to support pandas DataFrames, iterable of documents, and dataclass instances.
  • Added if_exists parameter to write for handling existing documents ('replace' or 'append').
  • Improved error handling and logging during data processing.
asyncdb/drivers/mongo.py
Added support for prepared statements.
  • Implemented prepare method (although MongoDB does not support prepared statements, it raises a DriverError indicating this).
asyncdb/drivers/mongo.py
Updated tests and examples.
  • Added tests for new features like DSN handling, connection timeout, and data processing.
  • Updated example code to demonstrate the new driver's usage.
tests/test_mongo.py
examples/test_mongo.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time. You can also use
    this command to specify where the summary should be inserted.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@phenobarbital phenobarbital merged commit 2481828 into master Jan 10, 2025
1 of 2 checks passed
@phenobarbital phenobarbital deleted the py313-support branch January 10, 2025 01:34
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @phenobarbital - I've reviewed your changes and they look great!

Here's what I looked at during the review
  • 🟢 General issues: all looks good
  • 🟢 Security: all looks good
  • 🟡 Testing: 1 issue found
  • 🟡 Complexity: 1 issue found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +157 to +158
write_success = await db_driver.write(data=sample_dataframe, collection=collection_name)
assert write_success is True, "Failed to write DataFrame to MongoDB collection."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Test edge cases for write operation with DataFrame

Consider adding tests for empty DataFrames, DataFrames with null values, and different data types in the DataFrame (e.g., nested dictionaries, lists). Also, test the if_exists='replace' scenario with DataFrame input.

Suggested implementation:

@pytest.fixture
def empty_dataframe():
    """Fixture that returns an empty DataFrame."""
    return pd.DataFrame()

@pytest.fixture
def null_dataframe():
    """Fixture that returns a DataFrame with null values."""
    return pd.DataFrame({
        'col1': [1, None, 3],
        'col2': ['a', None, 'c'],
        'col3': [None, 2.5, 3.5]
    })

@pytest.fixture
def complex_dataframe():
    """Fixture that returns a DataFrame with complex data types."""
    return pd.DataFrame({
        'nested_dict': [{'key1': 'value1'}, {'key2': 'value2'}],
        'list_col': [[1, 2, 3], [4, 5, 6]],
        'mixed_types': [1, 'string'],
        'basic_col': [1, 2]
    })

@pytest.mark.asyncio
async def test_write_dataframe_basic(sample_collection, sample_dataframe):
    # Verify that the data was written correctly
@pytest.mark.asyncio
async def test_write_empty_dataframe(sample_collection, empty_dataframe):
    """Test writing an empty DataFrame to MongoDB."""
    db_driver, collection_name = sample_collection

    write_success = await db_driver.write(data=empty_dataframe, collection=collection_name)
    assert write_success is True, "Failed to write empty DataFrame to MongoDB collection."

    # Verify that no documents were written
    documents, error = await db_driver.fetch(collection_name)
    assert error is None, f"Error during fetch: {error}"
    assert len(documents) == 0, "Collection should be empty"

@pytest.mark.asyncio
async def test_write_null_dataframe(sample_collection, null_dataframe):
    """Test writing a DataFrame with null values to MongoDB."""
    db_driver, collection_name = sample_collection

    write_success = await db_driver.write(data=null_dataframe, collection=collection_name)
    assert write_success is True, "Failed to write DataFrame with null values to MongoDB collection."

    # Verify the data including null values
    documents, error = await db_driver.fetch(collection_name)
    assert error is None, f"Error during fetch: {error}"
    assert len(documents) == 3, "Number of documents doesn't match"

    # Check that null values are preserved
    fetched_data = [{k: v for k, v in doc.items() if k != '_id'} for doc in documents]
    assert any(None in doc.values() for doc in fetched_data), "Null values were not preserved"

@pytest.mark.asyncio
async def test_write_complex_dataframe(sample_collection, complex_dataframe):
    """Test writing a DataFrame with complex data types to MongoDB."""
    db_driver, collection_name = sample_collection

    write_success = await db_driver.write(data=complex_dataframe, collection=collection_name)
    assert write_success is True, "Failed to write DataFrame with complex types to MongoDB collection."

    # Verify the complex data types
    documents, error = await db_driver.fetch(collection_name)
    assert error is None, f"Error during fetch: {error}"

    fetched_data = [{k: v for k, v in doc.items() if k != '_id'} for doc in documents]
    assert isinstance(fetched_data[0]['nested_dict'], dict), "Nested dict not preserved"
    assert isinstance(fetched_data[0]['list_col'], list), "List not preserved"

@pytest.mark.asyncio
async def test_write_dataframe_replace(sample_collection, sample_dataframe, null_dataframe):
    """Test writing a DataFrame with if_exists='replace' option."""
    db_driver, collection_name = sample_collection

    # First write the sample dataframe
    write_success = await db_driver.write(data=sample_dataframe, collection=collection_name)
    assert write_success is True, "Failed to write initial DataFrame"

    # Then replace it with the null dataframe
    write_success = await db_driver.write(
        data=null_dataframe,
        collection=collection_name,
        if_exists='replace'
    )
    assert write_success is True, "Failed to replace existing data with new DataFrame"

    # Verify that only the new data exists
    documents, error = await db_driver.fetch(collection_name)
    assert error is None, f"Error during fetch: {error}"
    assert len(documents) == 3, "Collection should only contain the replacement data"

You'll need to:

  1. Import pandas as pd at the top of the file if not already imported
  2. Ensure the MongoDB driver class supports the if_exists='replace' parameter in its write method
  3. Update any existing documentation to reflect the new test cases

method = getattr(collection, operation)
result = await method(*args, **kwargs)
except Exception as err:
error = err
return (result, error)

async def execute_many(self, collection_name: str, operation: str, documents: list) -> Optional[Any]:
async def execute_many(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider implementing batching in execute_many to handle large operations efficiently and prevent potential memory issues.

The execute_many method currently just forwards to execute, creating an unnecessary abstraction layer. To justify this separate method, implement proper bulk operation handling:

async def execute_many(
    self,
    collection_name: str, 
    operation: str,
    documents: list,
    batch_size: int = 1000
) -> Optional[Any]:
    """Execute bulk operations with batching for better performance."""
    results = []
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        result = await self.execute(collection_name, operation, batch)
        results.append(result)
    return results

This adds value by:

  • Breaking large operations into manageable batches
  • Aggregating results from multiple batches
  • Preventing memory issues with large document sets

Comment on lines +188 to +193
try:
table = self._connection.get_table(table_ref)
except NotFound:
raise DriverError(
f"BigQuery: Table `{dataset_id}.{table_id}` does not exist."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Explicitly raise from a previous error (raise-from-previous-error)

except Exception as err:
raise DriverError(f"No row to be returned {err}")
return result
return await self.queryrow(collection_name, filter, *args, **kwargs)

fetchrow = fetch_one
fetchone = fetch_one

async def write(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

  • Simplify conditional into switch-like form (switch)
  • Low code quality found in mongo.write - 24% (low-code-quality)


Explanation
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

Comment on lines +78 to +79
df = pd.DataFrame(data)
return df
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)

Suggested change
df = pd.DataFrame(data)
return df
return pd.DataFrame(data)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant