diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 0ba575d..b07c35d 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -201,3 +201,44 @@ Example: ... print("Feed reader end") ... >>> db.changes_feed(MyReader()) + + +Pagination +---------- + +py-couchdb provides convenient pagination functionality for both CouchDB views and Mango queries. This eliminates the need to manually manage `skip` parameters and provides stable, cursor-based pagination. + +View Pagination +~~~~~~~~~~~~~~~ + +Use `view_pages()` for paginating through CouchDB view results: + +.. code-block:: python + + >>> # Paginate through view results + >>> for page in db.view_pages("design/view", page_size=10): + ... print(f"Page with {len(page)} rows") + ... for row in page: + ... print(f" {row['id']}: {row['key']}") + +Mango Query Pagination +~~~~~~~~~~~~~~~~~~~~~~ + +Use `mango_pages()` for paginating through Mango query results: + +.. code-block:: python + + >>> # Paginate through Mango query results + >>> selector = {"type": "user", "active": True} + >>> for page in db.mango_pages(selector, page_size=10): + ... print(f"Page with {len(page)} documents") + ... for doc in page: + ... print(f" {doc['_id']}: {doc['name']}") + +Key Benefits +~~~~~~~~~~~ + +- **Stable pagination**: No duplicate or missing results during concurrent updates +- **Automatic cursor management**: No manual `skip` parameter handling +- **Memory efficient**: Process large datasets page by page +- **Consistent API**: Same interface for both view and Mango pagination diff --git a/pycouchdb/client.py b/pycouchdb/client.py index 6960baa..85a621c 100644 --- a/pycouchdb/client.py +++ b/pycouchdb/client.py @@ -6,16 +6,17 @@ import copy import mimetypes import warnings -from typing import Any, Dict, List, Optional, Union, Iterator, Callable, TYPE_CHECKING +from typing import Any, Dict, List, Optional, Union, Iterator, Callable, TYPE_CHECKING, Tuple from . import utils from . import feedreader from . import exceptions as exp from .resource import Resource from .types import ( - Json, Document, Row, BulkItem, ServerInfo, DatabaseInfo, + Json, Document, Row, BulkItem, ServerInfo, DatabaseInfo, ChangeResult, ViewResult, Credentials, AuthMethod, DocId, Rev ) +from .pagination import view_pages, mango_pages, ViewRows, MangoDocs, PageSize # Type alias for feed reader parameter FeedReader = Union[Callable[[Dict[str, Any]], None], feedreader.BaseFeedReader] @@ -855,3 +856,79 @@ def changes_list(self, **kwargs): (resp, result) = self.resource("_changes").get(params=kwargs) return result['last_seq'], result['results'] + + def find(self, selector: Dict[str, Any], **kwargs: Any) -> Iterator[Document]: + """ + Execute a Mango query using the _find endpoint. + + :param selector: Mango query selector + :param kwargs: Additional query parameters (limit, bookmark, etc.) + :returns: Iterator of documents matching the selector + """ + params = copy.copy(kwargs) + params['selector'] = selector + + data = utils.force_bytes(json.dumps(params)) + (resp, result) = self.resource.post("_find", data=data) + + if result is None or 'docs' not in result: + return + + for doc in result['docs']: + yield doc + + def view_pages(self, design_and_view: str, page_size: PageSize, params: Optional[Dict[str, Any]] = None) -> Iterator[ViewRows]: + """ + Paginate through CouchDB view results with automatic cursor management. + + This method provides convenient pagination for view queries without manual + skip parameter management. It automatically handles startkey and startkey_docid + for stable pagination. + + :param design_and_view: View name (e.g., "design/view") + :param page_size: Number of rows per page + :param params: Additional query parameters + :returns: Iterator yielding lists of rows for each page + + .. versionadded:: 1.17 + """ + path = utils._path_from_name(design_and_view, '_view') + + def fetch_view(params_dict: Dict[str, Any]) -> Tuple[Any, Optional[Dict[str, Any]]]: + data = None + if "keys" in params_dict: + data_dict = {"keys": params_dict.pop('keys')} + data = utils.force_bytes(json.dumps(data_dict)) + + encoded_params = utils.encode_view_options(params_dict) + + if data: + (resp, result) = self.resource(*path).post(params=encoded_params, data=data) + else: + (resp, result) = self.resource(*path).get(params=encoded_params) + + return resp, result + + return view_pages(fetch_view, design_and_view, page_size, params) + + def mango_pages(self, selector: Dict[str, Any], page_size: PageSize, params: Optional[Dict[str, Any]] = None) -> Iterator[MangoDocs]: + """ + Paginate through Mango query results with automatic bookmark management. + + This method provides convenient pagination for Mango queries without manual + bookmark parameter management. It automatically handles the bookmark cursor + for stable pagination. + + :param selector: Mango query selector + :param page_size: Number of documents per page + :param params: Additional query parameters + :returns: Iterator yielding lists of documents for each page + + .. versionadded:: 1.17 + """ + def fetch_mango(params_dict: Dict[str, Any]) -> Tuple[Any, Optional[Dict[str, Any]]]: + data = utils.force_bytes(json.dumps(params_dict)) + (resp, result) = self.resource.post("_find", data=data) + return resp, result + + return mango_pages(fetch_mango, selector, page_size, params) diff --git a/pycouchdb/pagination.py b/pycouchdb/pagination.py new file mode 100644 index 0000000..8621041 --- /dev/null +++ b/pycouchdb/pagination.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- + +""" +Pagination utilities for CouchDB views and Mango queries. + +This module provides convenient pagination functionality for both CouchDB views +and Mango queries, handling the complexity of cursor-based pagination internally. +""" + +from typing import Any, Dict, List, Iterator, Optional, Callable, Union, Tuple +import json +import copy + +from . import utils +from .types import Row, Document, Json, ViewRows, MangoDocs, PageSize + +__all__ = ['view_pages', 'mango_pages', 'ViewRows', 'MangoDocs', 'PageSize'] + + +def view_pages( + fetch: Callable[[Dict[str, Any]], Tuple[Any, Optional[Dict[str, Any]]]], + view: str, + page_size: PageSize, + params: Optional[Dict[str, Any]] = None +) -> Iterator[ViewRows]: + """ + Paginate through CouchDB view results using startkey/startkey_docid cursor. + + This function handles the complexity of CouchDB view pagination by automatically + managing startkey and startkey_docid parameters for stable pagination. + + .. warning:: + Pagination with grouped and reduced views (group=true, reduce=true) is + inefficient and unreliable. CouchDB must process all preceding groups + for skip operations, and total_rows/offset values are inconsistent with + reduced output. Consider fetching all results at once for reduced views. + + :param fetch: Function that makes the actual HTTP request and returns (response, result) + :param view: View name (e.g., "design/view") + :param page_size: Number of rows per page + :param params: Additional query parameters + :returns: Iterator yielding lists of rows for each page + """ + if params is None: + params = {} + + # Create a copy to avoid modifying the original + query_params = copy.deepcopy(params) + query_params['limit'] = page_size + 1 # Request one extra to detect if there are more pages + + # Track pagination state + startkey = None + startkey_docid = None + skip = 0 + + while True: + # Build current page parameters + current_params = copy.deepcopy(query_params) + + if startkey is not None: + current_params['startkey'] = startkey + current_params['startkey_docid'] = startkey_docid + current_params['skip'] = skip + + # Encode view parameters properly + current_params = _encode_view_params(current_params) + + # Make the request + response, result = fetch(current_params) + + if result is None or 'rows' not in result: + break + + rows = result['rows'] + + # If we got fewer rows than requested, this is the last page + if len(rows) <= page_size: + if rows: # Only yield if there are rows + yield rows + break + + # We got more rows than page_size, so there are more pages + # Yield current page (excluding the extra row) + current_page = rows[:page_size] + yield current_page + + # Set up for next page using the last row as cursor + last_row = rows[page_size - 1] + startkey = last_row['key'] + startkey_docid = last_row['id'] + skip = 1 # Skip the row used as the cursor to avoid returning it again (prevents duplicate results in cursor-based pagination) + + +def mango_pages( + fetch_find: Callable[[Dict[str, Any]], Tuple[Any, Optional[Dict[str, Any]]]], + selector: Dict[str, Any], + page_size: PageSize, + params: Optional[Dict[str, Any]] = None +) -> Iterator[MangoDocs]: + """ + Paginate through Mango query results using bookmark cursor. + + This function handles Mango query pagination by automatically managing + the bookmark parameter for stable pagination. + + :param fetch_find: Function that makes the actual HTTP request and returns (response, result) + :param selector: Mango query selector + :param page_size: Number of documents per page + :param params: Additional query parameters + :returns: Iterator yielding lists of documents for each page + """ + if params is None: + params = {} + + # Create a copy to avoid modifying the original + query_params = copy.deepcopy(params) + query_params['limit'] = page_size + query_params['selector'] = selector + + bookmark = None + + while True: + # Build current page parameters + current_params = copy.deepcopy(query_params) + + if bookmark is not None: + current_params['bookmark'] = bookmark + + # Make the request + response, result = fetch_find(current_params) + + if result is None or 'docs' not in result: + break + + docs = result['docs'] + + # If no documents, we're done + if not docs: + break + + # Yield current page + yield docs + + # Check if there are more pages + bookmark = result.get('bookmark') + if not bookmark: + break + + +def _encode_view_params(params: Dict[str, Any]) -> Dict[str, Any]: + """Encode view parameters using the same logic as the main client.""" + return utils.encode_view_options(params) diff --git a/pycouchdb/types.py b/pycouchdb/types.py index faa7d36..4dcba61 100644 --- a/pycouchdb/types.py +++ b/pycouchdb/types.py @@ -82,6 +82,11 @@ def on_heartbeat(self) -> None: ... DocId = str Rev = str +# Pagination type aliases +ViewRows = List[Row] +MangoDocs = List[Document] +PageSize = int + # Constants DEFAULT_BASE_URL: Final[str] = "http://localhost:5984/" DEFAULT_AUTH_METHOD: Final[str] = "basic" diff --git a/pycouchdb/utils.py b/pycouchdb/utils.py index 2947b83..88875ec 100644 --- a/pycouchdb/utils.py +++ b/pycouchdb/utils.py @@ -87,7 +87,12 @@ def urljoin(base: str, *path: str) -> str: def as_json(response: Any) -> Optional[Union[Dict[str, Any], List[Any], str]]: if "application/json" in response.headers['content-type']: - response_src = response.content.decode('utf-8') + try: + response_src = response.content.decode('utf-8') + except UnicodeDecodeError: + # Try with error handling for invalid UTF-8 + response_src = response.content.decode('utf-8', errors='replace') + if response.content != b'': return json.loads(response_src) else: diff --git a/test/integration/test_integration.py b/test/integration/test_integration.py index f512563..592ba68 100644 --- a/test/integration/test_integration.py +++ b/test/integration/test_integration.py @@ -578,7 +578,7 @@ def test_basic_auth_success(): def test_basic_auth_failure(): """Test basic authentication with invalid credentials.""" server = pycouchdb.Server('http://invalid:credentials@localhost:5984/') - + with pytest.raises(Exception): server.info() @@ -619,12 +619,12 @@ def test_concurrent_document_updates(db): """Test concurrent updates to the same document.""" import threading import time - + doc = db.save({'_id': 'concurrent_test', 'counter': 0}) - + results = [] errors = [] - + def update_document(): try: for i in range(5): @@ -636,16 +636,16 @@ def update_document(): time.sleep(0.01) except Exception as e: errors.append(e) - + threads = [] for i in range(3): thread = threading.Thread(target=update_document) threads.append(thread) thread.start() - + for thread in threads: thread.join() - + assert len(results) > 0 assert len(errors) >= 0 @@ -654,10 +654,10 @@ def test_concurrent_database_operations(server): """Test concurrent database creation and deletion.""" import threading import time - + results = [] errors = [] - + def create_and_delete_db(db_num): try: db_name = f'concurrent_db_{db_num}' @@ -667,18 +667,18 @@ def create_and_delete_db(db_num): results.append(f'success_{db_num}') except Exception as e: errors.append(f'error_{db_num}: {e}') - + # Start multiple threads threads = [] for i in range(5): thread = threading.Thread(target=create_and_delete_db, args=(i,)) threads.append(thread) thread.start() - + # Wait for all threads to complete for thread in threads: thread.join() - + # Check results assert len(results) > 0 # Some operations might fail due to timing, that's expected @@ -694,11 +694,11 @@ def test_large_document(db): 'data': large_data, 'size': len(large_data) } - + saved_doc = db.save(doc) assert saved_doc['_id'] == 'large_doc' assert saved_doc['size'] == len(large_data) - + # Retrieve and verify retrieved_doc = db.get('large_doc') assert retrieved_doc['size'] == len(large_data) @@ -715,11 +715,11 @@ def test_bulk_operations_large_dataset(db): 'index': i, 'data': f'content_{i}' * 100 # Make each doc reasonably sized }) - + # Save in bulk saved_docs = db.save_bulk(docs) assert len(saved_docs) == 1000 - + # Verify some documents for i in range(0, 1000, 100): doc = db.get(f'bulk_doc_{i}') @@ -731,23 +731,23 @@ def test_memory_efficient_streaming(db): """Test memory-efficient streaming operations.""" # Create a document with attachment doc = db.save({'_id': 'streaming_test', 'type': 'test'}) - + # Create a large attachment large_content = b'x' * (100 * 1024) # 100KB import io content_stream = io.BytesIO(large_content) - + # Put attachment doc_with_attachment = db.put_attachment(doc, content_stream, 'large_file.txt') - + # Get attachment with streaming stream_response = db.get_attachment(doc_with_attachment, 'large_file.txt', stream=True) - + # Read in chunks to test streaming chunks = [] for chunk in stream_response.iter_content(chunk_size=1024): chunks.append(chunk) - + # Verify content retrieved_content = b''.join(chunks) assert retrieved_content == large_content @@ -758,17 +758,17 @@ def test_changes_feed_error_handling(db): """Test changes feed with error scenarios.""" messages = [] errors = [] - + def error_prone_reader(message, db): messages.append(message) if len(messages) > 2: raise Exception("Simulated error in feed reader") - + try: db.changes_feed(error_prone_reader, limit=5) except Exception as e: errors.append(e) - + assert len(messages) > 0 @@ -776,19 +776,19 @@ def test_changes_feed_heartbeat_handling(db): """Test changes feed heartbeat handling.""" heartbeats = [] messages = [] - + class HeartbeatTestReader(pycouchdb.feedreader.BaseFeedReader): def on_message(self, message): messages.append(message) if len(messages) >= 2: raise pycouchdb.exceptions.FeedReaderExited() - + def on_heartbeat(self): heartbeats.append('heartbeat') - + reader = HeartbeatTestReader() db.changes_feed(reader, limit=5) - + assert len(heartbeats) >= 0 @@ -802,11 +802,11 @@ def test_unicode_document_ids(db): 'ドキュメント_テスト', 'тест_документ_123' ] - + for doc_id in unicode_ids: doc = db.save({'_id': doc_id, 'content': f'Content for {doc_id}'}) assert doc['_id'] == doc_id - + # Retrieve and verify retrieved_doc = db.get(doc_id) assert retrieved_doc['_id'] == doc_id @@ -822,9 +822,9 @@ def test_unicode_content(db): 'japanese': 'これは日本語のテキストです', 'emoji': '🚀📚💻🎉' } - + doc = db.save({'_id': 'unicode_test', **unicode_content}) - + retrieved_doc = db.get('unicode_test') for key, value in unicode_content.items(): assert retrieved_doc[key] == value @@ -837,12 +837,12 @@ def test_special_characters_in_database_names(server): 'test_db_123', # underscores and numbers (most basic) 'test-db-123', # dashes and numbers ] - + invalid_names = [ 'TestDB', '123test', ] - + for db_name in allowed_names: try: db = server.create(db_name) @@ -851,7 +851,7 @@ def test_special_characters_in_database_names(server): assert db_name not in server except Exception as e: pytest.skip(f"Database name '{db_name}' not allowed: {e}") - + for db_name in invalid_names: with pytest.raises(Exception): server.create(db_name) @@ -863,14 +863,14 @@ def test_special_characters_in_database_names(server): def test_bulk_operation_performance(db): """Test performance of bulk operations.""" import time - + # Test bulk save performance docs = [{'index': i, 'data': f'content_{i}'} for i in range(100)] - + start_time = time.time() saved_docs = db.save_bulk(docs) end_time = time.time() - + assert len(saved_docs) == 100 assert end_time - start_time < 10 @@ -881,11 +881,11 @@ def test_empty_database_operations(db): # Test querying empty database results = list(db.all()) assert len(results) == 0 - + # Test changes on empty database last_seq, changes = db.changes_list() assert len(changes) == 0 - + try: result = db.query('nonexistent/view') assert list(result) == [] @@ -899,13 +899,13 @@ def test_document_with_system_fields(db): '_id': 'system_fields_test', 'custom_field': 'value', } - + saved_doc = db.save(doc) assert saved_doc['_id'] == 'system_fields_test' assert saved_doc['custom_field'] == 'value' assert '_rev' in saved_doc assert saved_doc['_rev'].startswith('1-') - + saved_doc['custom_field'] = 'updated_value' updated_doc = db.save(saved_doc) assert updated_doc['custom_field'] == 'updated_value' @@ -915,26 +915,26 @@ def test_document_with_system_fields(db): def test_attachment_with_special_characters(db): """Test attachments with special characters in filenames.""" import io - + special_filenames = [ 'file_with_underscores.txt', 'file-with-dashes.txt', 'file.with.dots.txt', 'файл_с_кириллицей.txt' ] - + for i, filename in enumerate(special_filenames): try: doc = db.save({'_id': f'attachment_test_{i}', 'type': 'test'}) - + content = f'Content for {filename}'.encode('utf-8') content_stream = io.BytesIO(content) - + doc_with_attachment = db.put_attachment(doc, content_stream, filename) - + retrieved_content = db.get_attachment(doc_with_attachment, filename) assert retrieved_content.decode('utf-8') == f'Content for {filename}' - + except Exception as e: pytest.skip(f"Filename '{filename}' not allowed: {e}") @@ -967,11 +967,11 @@ def test_design_document_management(db): "items": "function(head, req) { var row; while (row = getRow()) { send(row.value); } }" } } - + # Save design document saved_design = db.save(design_doc) assert saved_design['_id'] == '_design/test_views' - + # Create some test documents test_docs = [ {'_id': 'doc1', 'name': 'Alice', 'type': 'user', 'status': 'active', 'created_at': '2023-01-01'}, @@ -979,21 +979,21 @@ def test_design_document_management(db): {'_id': 'doc3', 'name': 'Charlie', 'type': 'admin', 'status': 'active', 'created_at': '2023-01-03'}, ] db.save_bulk(test_docs) - + # Test different views by_name_results = list(db.query('test_views/by_name')) assert len(by_name_results) == 3 - + by_type_results = list(db.query('test_views/by_type', group=True)) assert len(by_type_results) == 2 # user and admin types - + # Test reduce function total_by_type = db.one('test_views/by_type', flat='value') assert total_by_type == 3 # Total count of all documents - + # Test date range query - date_results = list(db.query('test_views/by_date', - startkey='2023-01-01', + date_results = list(db.query('test_views/by_date', + startkey='2023-01-01', endkey='2023-01-02')) assert len(date_results) == 2 @@ -1010,15 +1010,15 @@ def test_view_compaction_and_cleanup(db): } } db.save(design_doc) - + # Add some documents to create view data for i in range(100): db.save({'_id': f'compaction_doc_{i}', 'id': i, 'value': f'value_{i}'}) - + # Test view compaction result = db.compact_view('compaction_test') assert result is not None - + # Test database cleanup cleanup_result = db.cleanup() assert cleanup_result is not None @@ -1029,7 +1029,7 @@ def test_replication_edge_cases(server): # Create source and target databases source_db = server.create('replication_source') target_db = server.create('replication_target') - + try: # Add documents to source source_docs = [ @@ -1038,18 +1038,18 @@ def test_replication_edge_cases(server): {'_id': 'doc3', 'content': 'source content 3'}, ] source_db.save_bulk(source_docs) - + # Test basic replication replicate_result = server.replicate( SERVER_URL + 'replication_source', SERVER_URL + 'replication_target' ) assert replicate_result is not None - + # Verify documents were replicated target_docs = list(target_db.all()) assert len(target_docs) >= 3 - + # Test replication with create_target=True replicate_with_create = server.replicate( SERVER_URL + 'replication_source', @@ -1057,13 +1057,13 @@ def test_replication_edge_cases(server): create_target=True ) assert replicate_with_create is not None - + # Verify target database was created assert 'replication_target_create' in server - + # Clean up created database server.delete('replication_target_create') - + finally: # Clean up server.delete('replication_source') @@ -1075,7 +1075,7 @@ def test_library_compaction_api_behavior(db): # Test that compact() method returns expected result compact_result = db.compact() assert compact_result is not None - + # Test that compact_view() works with valid design doc design_doc = { "_id": "_design/compact_test", @@ -1086,15 +1086,15 @@ def test_library_compaction_api_behavior(db): } } db.save(design_doc) - + # Add some documents to create view data for i in range(10): db.save({'_id': f'compact_doc_{i}', 'id': i, 'value': f'value_{i}'}) - + # Test view compaction API view_compact_result = db.compact_view('compact_test') assert view_compact_result is not None - + # Test cleanup API cleanup_result = db.cleanup() assert cleanup_result is not None @@ -1110,7 +1110,7 @@ def test_changes_feed_with_filters(db): } } db.save(design_doc) - + # Add documents of different types docs = [ {'_id': 'user1', 'type': 'user', 'name': 'Alice'}, @@ -1118,20 +1118,20 @@ def test_changes_feed_with_filters(db): {'_id': 'user2', 'type': 'user', 'name': 'Charlie'}, ] db.save_bulk(docs) - + # Test changes feed with filter messages = [] - + def filter_reader(message, db): messages.append(message) if len(messages) >= 2: raise pycouchdb.exceptions.FeedReaderExited() - + try: db.changes_feed(filter_reader, filter='filters/by_type', type='user', limit=10) except Exception: pass # May not be supported in all CouchDB versions - + # Should have received some messages assert len(messages) >= 0 @@ -1139,7 +1139,7 @@ def filter_reader(message, db): def test_attachment_metadata_and_content_types(db): """Test attachment handling with different content types and metadata.""" doc = db.save({'_id': 'attachment_metadata_test', 'type': 'test'}) - + # Test different content types content_types = [ ('text.txt', 'text/plain', b'Plain text content'), @@ -1147,27 +1147,27 @@ def test_attachment_metadata_and_content_types(db): ('image.png', 'image/png', b'fake_png_data'), ('document.pdf', 'application/pdf', b'fake_pdf_data'), ] - + for filename, content_type, content in content_types: import io content_stream = io.BytesIO(content) - + # Get fresh document for each attachment to avoid conflicts current_doc = db.get('attachment_metadata_test') - + # Put attachment with specific content type doc_with_attachment = db.put_attachment( current_doc, content_stream, filename, content_type=content_type ) - + # Verify attachment metadata assert '_attachments' in doc_with_attachment assert filename in doc_with_attachment['_attachments'] - + attachment_info = doc_with_attachment['_attachments'][filename] assert attachment_info['content_type'] == content_type assert attachment_info['length'] == len(content) - + # Retrieve and verify content retrieved_content = db.get_attachment(doc_with_attachment, filename) assert retrieved_content == content @@ -1177,30 +1177,30 @@ def test_document_conflicts_resolution(db): """Test document conflict resolution scenarios.""" # Create initial document doc1 = db.save({'_id': 'conflict_test', 'version': 1, 'data': 'initial'}) - + # Simulate concurrent updates by getting the same document twice doc2 = db.get('conflict_test') doc3 = db.get('conflict_test') - + # Update both copies doc2['version'] = 2 doc2['data'] = 'updated_by_client_1' doc3['version'] = 2 doc3['data'] = 'updated_by_client_2' - + # Save first update updated_doc2 = db.save(doc2) - + # Second update should conflict with pytest.raises(pycouchdb.exceptions.Conflict): db.save(doc3) - + # Resolve conflict by getting latest and updating latest_doc = db.get('conflict_test') latest_doc['version'] = 3 latest_doc['data'] = 'resolved_conflict' resolved_doc = db.save(latest_doc) - + assert resolved_doc['version'] == 3 assert resolved_doc['data'] == 'resolved_conflict' @@ -1214,19 +1214,19 @@ def test_bulk_operations_with_conflicts(db): {'_id': 'bulk_conflict_3', 'version': 1}, ] db.save_bulk(initial_docs) - + # Get documents for update docs_to_update = [db.get(f'bulk_conflict_{i}') for i in range(1, 4)] - + # Update all documents for i, doc in enumerate(docs_to_update): doc['version'] = 2 doc['updated_by'] = f'client_{i}' - + # Save in bulk - should succeed updated_docs = db.save_bulk(docs_to_update) assert len(updated_docs) == 3 - + # Try to update again with old revision - should conflict # We need to use the old revision numbers to create a conflict old_docs = [ @@ -1242,20 +1242,20 @@ def test_library_database_config_api(server): """Test pycouchdb library's database config API.""" # Create a test database test_db = server.create('config_test') - + try: # Test that config() method returns expected data structure db_info = test_db.config() assert isinstance(db_info, dict) assert 'update_seq' in db_info assert 'doc_count' in db_info - + # Test that we can access the database name assert test_db.name == 'config_test' - + # Test that database length works assert isinstance(len(test_db), int) - + finally: server.delete('config_test') @@ -1265,15 +1265,15 @@ def test_library_server_initialization(): # Test default initialization server1 = pycouchdb.Server() assert server1.base_url == 'http://localhost:5984/' - + # Test custom URL initialization server2 = pycouchdb.Server('http://custom:5984/') assert server2.base_url == 'http://custom:5984/' - + # Test with credentials server3 = pycouchdb.Server('http://user:pass@localhost:5984/') assert server3.base_url == 'http://localhost:5984/' - + # Test with verify parameter server4 = pycouchdb.Server(verify=True) assert server4.base_url == 'http://localhost:5984/' @@ -1283,7 +1283,7 @@ def test_custom_headers_and_parameters(db): """Test custom headers and parameters in requests.""" # Test with custom parameters in get request doc = db.save({'_id': 'custom_params_test', 'data': 'test'}) - + # Test getting document with custom parameters # Note: revs_info parameter should be passed to the underlying request retrieved_doc = db.get('custom_params_test', revs=True, revs_info=True) @@ -1302,16 +1302,474 @@ def test_library_database_length_and_config_api(db): assert isinstance(initial_config, dict) assert 'doc_count' in initial_config assert 'update_seq' in initial_config - + # Add some documents docs = [{'index': i, 'data': f'length_test_{i}'} for i in range(5)] db.save_bulk(docs) - + # Test that length reflects document count new_length = len(db) new_config = db.config() - + assert new_length >= initial_length + 5 assert new_config['doc_count'] >= initial_config['doc_count'] + 5 assert new_config['update_seq'] > initial_config['update_seq'] + +# Pagination Integration Tests +import json +from pycouchdb import utils + +def test_view_pages_integration_single_page(db): + """Test view_pages integration with single page of results.""" + from pycouchdb.pagination import view_pages + + # Create a design document with a view + design_doc = { + "_id": "_design/pagination_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice'}, + {'_id': 'doc2', 'name': 'Bob'}, + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_view(params): + path = ['_design', 'pagination_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test pagination with page size larger than total documents + pages = list(view_pages(fetch_view, 'pagination_test/by_name', 10)) + + assert len(pages) == 1 + assert len(pages[0]) == 2 + assert pages[0][0]['id'] == 'doc1' + assert pages[0][1]['id'] == 'doc2' + + # Cleanup + db.delete('_design/pagination_test') + + +def test_view_pages_integration_multiple_pages(db): + """Test view_pages integration with multiple pages.""" + from pycouchdb.pagination import view_pages + + # Create a design document with a view + design_doc = { + "_id": "_design/pagination_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create test documents (more than page size) + test_docs = [ + {'_id': f'doc{i}', 'name': f'User{i}'} for i in range(1, 8) # 7 documents + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_view(params): + path = ['_design', 'pagination_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test pagination with small page size + pages = list(view_pages(fetch_view, 'pagination_test/by_name', 3)) + + assert len(pages) == 3 # 3, 3, 1 documents + assert len(pages[0]) == 3 + assert len(pages[1]) == 3 + assert len(pages[2]) == 1 + + # Verify all documents are retrieved + all_docs = [] + for page in pages: + all_docs.extend(page) + + doc_ids = [doc['id'] for doc in all_docs] + expected_ids = [f'doc{i}' for i in range(1, 8)] + assert set(doc_ids) == set(expected_ids) + + # Cleanup + db.delete('_design/pagination_test') + + +def test_view_pages_integration_with_params(db): + """Test view_pages integration with additional parameters.""" + from pycouchdb.pagination import view_pages + + # Create a design document with a view + design_doc = { + "_id": "_design/pagination_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice', 'age': 25}, + {'_id': 'doc2', 'name': 'Bob', 'age': 30}, + {'_id': 'doc3', 'name': 'Charlie', 'age': 35}, + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_view(params): + path = ['_design', 'pagination_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test pagination with include_docs parameter + params = {'include_docs': True} + pages = list(view_pages(fetch_view, 'pagination_test/by_name', 2, params)) + + assert len(pages) == 2 + assert len(pages[0]) == 2 + assert len(pages[1]) == 1 + + # Verify documents are included + for page in pages: + for doc in page: + assert 'doc' in doc + assert doc['doc']['name'] in ['Alice', 'Bob', 'Charlie'] + + # Cleanup + db.delete('_design/pagination_test') + + +def test_mango_pages_integration_single_page(db): + """Test mango_pages integration with single page of results.""" + from pycouchdb.pagination import mango_pages + + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice', 'type': 'user'}, + {'_id': 'doc2', 'name': 'Bob', 'type': 'user'}, + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + # Test pagination with page size larger than total documents + selector = {'type': 'user'} + pages = list(mango_pages(fetch_find, selector, 10)) + + assert len(pages) == 1 + assert len(pages[0]) == 2 + assert pages[0][0]['_id'] == 'doc1' + assert pages[0][1]['_id'] == 'doc2' + + +def test_mango_pages_integration_multiple_pages(db): + """Test mango_pages integration with multiple pages.""" + from pycouchdb.pagination import mango_pages + + # Create test documents (more than page size) + test_docs = [ + {'_id': f'doc{i}', 'name': f'User{i}', 'type': 'user', 'index': i} + for i in range(1, 8) # 7 documents + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + # Test pagination with small page size + selector = {'type': 'user'} + pages = list(mango_pages(fetch_find, selector, 3)) + + assert len(pages) == 3 # 3, 3, 1 documents + assert len(pages[0]) == 3 + assert len(pages[1]) == 3 + assert len(pages[2]) == 1 + + # Verify all documents are retrieved + all_docs = [] + for page in pages: + all_docs.extend(page) + + doc_ids = [doc['_id'] for doc in all_docs] + expected_ids = [f'doc{i}' for i in range(1, 8)] + assert set(doc_ids) == set(expected_ids) + + +def test_mango_pages_integration_with_params(db): + """Test mango_pages integration with additional parameters.""" + from pycouchdb.pagination import mango_pages + + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice', 'type': 'user', 'age': 25}, + {'_id': 'doc2', 'name': 'Bob', 'type': 'user', 'age': 30}, + {'_id': 'doc3', 'name': 'Charlie', 'type': 'user', 'age': 35}, + ] + db.save_bulk(test_docs) + + # Create index for the sort field + index_def = { + "index": { + "fields": ["type", "age"] + }, + "name": "test_index" + } + db.resource.post('_index', data=utils.force_bytes(json.dumps(index_def))) + + # Create a fetch function that uses the database's resource directly + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + # Test pagination with sort and fields parameters + selector = {'type': 'user'} + params = { + 'sort': [{'age': 'asc'}], + 'fields': ['_id', 'name', 'age'] + } + pages = list(mango_pages(fetch_find, selector, 2, params)) + + assert len(pages) == 2 + assert len(pages[0]) == 2 + assert len(pages[1]) == 1 + + # Verify documents are sorted by age + all_docs = [] + for page in pages: + all_docs.extend(page) + + ages = [doc['age'] for doc in all_docs] + assert ages == [25, 30, 35] # Should be sorted by age + + +def test_pagination_large_dataset(db): + """Test pagination with a large dataset to verify performance.""" + from pycouchdb.pagination import view_pages, mango_pages + + # Create a large number of documents + large_docs = [ + {'_id': f'large_doc_{i}', 'name': f'User{i}', 'type': 'user', 'index': i} + for i in range(1, 101) # 100 documents + ] + db.save_bulk(large_docs) + + # Create a design document for view pagination + design_doc = { + "_id": "_design/large_test", + "views": { + "by_index": { + "map": "function(doc) { if (doc.index) emit(doc.index, doc); }" + } + } + } + db.save(design_doc) + + # Test view pagination with large dataset + def fetch_view(params): + path = ['_design', 'large_test', '_view', 'by_index'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + pages = list(view_pages(fetch_view, 'large_test/by_index', 20)) + + # Should have 5 pages of 20 documents each + assert len(pages) == 5 + for page in pages: + assert len(page) == 20 + + # Test mango pagination with large dataset + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'type': 'user'} + pages = list(mango_pages(fetch_find, selector, 25)) + + # Should have 4 pages of 25 documents each + assert len(pages) == 4 + for page in pages: + assert len(page) == 25 + + # Cleanup + db.delete('_design/large_test') + + +def test_pagination_empty_results(db): + """Test pagination with empty results.""" + from pycouchdb.pagination import view_pages, mango_pages + + # Create a design document + design_doc = { + "_id": "_design/empty_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Test view pagination with no matching documents + def fetch_view(params): + path = ['_design', 'empty_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + pages = list(view_pages(fetch_view, 'empty_test/by_name', 10)) + assert len(pages) == 0 + + # Test mango pagination with no matching documents + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'type': 'nonexistent'} + pages = list(mango_pages(fetch_find, selector, 10)) + assert len(pages) == 0 + + # Cleanup + db.delete('_design/empty_test') + + +def test_pagination_edge_cases(db): + """Test pagination edge cases.""" + from pycouchdb.pagination import view_pages, mango_pages + + # Create a design document + design_doc = { + "_id": "_design/edge_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create exactly one document + db.save({'_id': 'single_doc', 'name': 'Single'}) + + # Test view pagination with single document + def fetch_view(params): + path = ['_design', 'edge_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + pages = list(view_pages(fetch_view, 'edge_test/by_name', 1)) + assert len(pages) == 1 + assert len(pages[0]) == 1 + assert pages[0][0]['id'] == 'single_doc' + + # Test mango pagination with single document + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'name': 'Single'} + pages = list(mango_pages(fetch_find, selector, 1)) + assert len(pages) == 1 + assert len(pages[0]) == 1 + assert pages[0][0]['_id'] == 'single_doc' + + # Cleanup + db.delete('_design/edge_test') + + +def test_pagination_different_page_sizes(db): + """Test pagination with different page sizes.""" + from pycouchdb.pagination import view_pages, mango_pages + + # Create test documents + test_docs = [ + {'_id': f'doc{i}', 'name': f'User{i}', 'type': 'user'} + for i in range(1, 11) # 10 documents + ] + db.save_bulk(test_docs) + + # Create a design document + design_doc = { + "_id": "_design/size_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Test different page sizes for view pagination + def fetch_view(params): + path = ['_design', 'size_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test with page size 1 + pages = list(view_pages(fetch_view, 'size_test/by_name', 1)) + assert len(pages) == 10 + for page in pages: + assert len(page) == 1 + + # Test with page size 3 + pages = list(view_pages(fetch_view, 'size_test/by_name', 3)) + assert len(pages) == 4 # 3, 3, 3, 1 + assert len(pages[0]) == 3 + assert len(pages[1]) == 3 + assert len(pages[2]) == 3 + assert len(pages[3]) == 1 + + # Test different page sizes for mango pagination + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'type': 'user'} + + # Test with page size 2 + pages = list(mango_pages(fetch_find, selector, 2)) + assert len(pages) == 5 # 2, 2, 2, 2, 2 + for page in pages: + assert len(page) == 2 + + # Test with page size 7 + pages = list(mango_pages(fetch_find, selector, 7)) + assert len(pages) == 2 # 7, 3 + assert len(pages[0]) == 7 + assert len(pages[1]) == 3 + + # Cleanup + db.delete('_design/size_test') diff --git a/test/integration/test_pagination_integration.py b/test/integration/test_pagination_integration.py new file mode 100644 index 0000000..3a967b3 --- /dev/null +++ b/test/integration/test_pagination_integration.py @@ -0,0 +1,595 @@ +# -*- coding: utf-8 -*- + +""" +Integration tests for pycouchdb.pagination module. + +These tests require a running CouchDB instance and test the pagination +functionality with real database operations. +""" + +import pytest +import json +from pycouchdb.pagination import view_pages, mango_pages +from pycouchdb import utils + + +class TestViewPagesIntegration: + """Integration tests for view_pages function.""" + + def test_view_pages_single_page(self, db): + """Test view_pages integration with single page of results.""" + # Create a design document with a view + design_doc = { + "_id": "_design/pagination_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice'}, + {'_id': 'doc2', 'name': 'Bob'}, + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_view(params): + path = ['_design', 'pagination_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test pagination with page size larger than total documents + pages = list(view_pages(fetch_view, 'pagination_test/by_name', 10)) + + assert len(pages) == 1 + assert len(pages[0]) == 2 + assert pages[0][0]['id'] == 'doc1' + assert pages[0][1]['id'] == 'doc2' + + # Cleanup + db.delete('_design/pagination_test') + + def test_view_pages_multiple_pages(self, db): + """Test view_pages integration with multiple pages.""" + # Create a design document with a view + design_doc = { + "_id": "_design/pagination_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create test documents (more than page size) + test_docs = [ + {'_id': f'doc{i}', 'name': f'User{i}'} for i in range(1, 8) # 7 documents + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_view(params): + path = ['_design', 'pagination_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test pagination with small page size + pages = list(view_pages(fetch_view, 'pagination_test/by_name', 3)) + + assert len(pages) == 3 # 3, 3, 1 documents + assert len(pages[0]) == 3 + assert len(pages[1]) == 3 + assert len(pages[2]) == 1 + + # Verify all documents are retrieved + all_docs = [] + for page in pages: + all_docs.extend(page) + + doc_ids = [doc['id'] for doc in all_docs] + expected_ids = [f'doc{i}' for i in range(1, 8)] + assert set(doc_ids) == set(expected_ids) + + # Cleanup + db.delete('_design/pagination_test') + + def test_view_pages_with_params(self, db): + """Test view_pages integration with additional parameters.""" + # Create a design document with a view + design_doc = { + "_id": "_design/pagination_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice', 'age': 25}, + {'_id': 'doc2', 'name': 'Bob', 'age': 30}, + {'_id': 'doc3', 'name': 'Charlie', 'age': 35}, + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_view(params): + path = ['_design', 'pagination_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test pagination with include_docs parameter + params = {'include_docs': True} + pages = list(view_pages(fetch_view, 'pagination_test/by_name', 2, params)) + + assert len(pages) == 2 + assert len(pages[0]) == 2 + assert len(pages[1]) == 1 + + # Verify documents are included + for page in pages: + for doc in page: + assert 'doc' in doc + assert doc['doc']['name'] in ['Alice', 'Bob', 'Charlie'] + + # Cleanup + db.delete('_design/pagination_test') + + def test_view_pages_large_dataset(self, db): + """Test view_pages with a large dataset.""" + # Create a large number of documents + large_docs = [ + {'_id': f'large_doc_{i}', 'name': f'User{i}', 'type': 'user', 'index': i} + for i in range(1, 51) # 50 documents + ] + db.save_bulk(large_docs) + + # Create a design document for view pagination + design_doc = { + "_id": "_design/large_test", + "views": { + "by_index": { + "map": "function(doc) { if (doc.index) emit(doc.index, doc); }" + } + } + } + db.save(design_doc) + + # Test view pagination with large dataset + def fetch_view(params): + path = ['_design', 'large_test', '_view', 'by_index'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + pages = list(view_pages(fetch_view, 'large_test/by_index', 10)) + + # Should have 5 pages of 10 documents each + assert len(pages) == 5 + for page in pages: + assert len(page) == 10 + + # Cleanup + db.delete('_design/large_test') + + def test_view_pages_empty_results(self, db): + """Test view_pages with empty results.""" + # Create a design document + design_doc = { + "_id": "_design/empty_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Test view pagination with no matching documents + def fetch_view(params): + path = ['_design', 'empty_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + pages = list(view_pages(fetch_view, 'empty_test/by_name', 10)) + assert len(pages) == 0 + + # Cleanup + db.delete('_design/empty_test') + + def test_view_pages_edge_cases(self, db): + """Test view_pages edge cases.""" + # Create a design document + design_doc = { + "_id": "_design/edge_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Create exactly one document + db.save({'_id': 'single_doc', 'name': 'Single'}) + + # Test view pagination with single document + def fetch_view(params): + path = ['_design', 'edge_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + pages = list(view_pages(fetch_view, 'edge_test/by_name', 1)) + assert len(pages) == 1 + assert len(pages[0]) == 1 + assert pages[0][0]['id'] == 'single_doc' + + # Cleanup + db.delete('_design/edge_test') + + +class TestMangoPagesIntegration: + """Integration tests for mango_pages function.""" + + def test_mango_pages_single_page(self, db): + """Test mango_pages integration with single page of results.""" + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice', 'type': 'user'}, + {'_id': 'doc2', 'name': 'Bob', 'type': 'user'}, + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + # Test pagination with page size larger than total documents + selector = {'type': 'user'} + pages = list(mango_pages(fetch_find, selector, 10)) + + assert len(pages) == 1 + assert len(pages[0]) == 2 + assert pages[0][0]['_id'] == 'doc1' + assert pages[0][1]['_id'] == 'doc2' + + def test_mango_pages_multiple_pages(self, db): + """Test mango_pages integration with multiple pages.""" + # Create test documents (more than page size) + test_docs = [ + {'_id': f'doc{i}', 'name': f'User{i}', 'type': 'user', 'index': i} + for i in range(1, 8) # 7 documents + ] + db.save_bulk(test_docs) + + # Create a fetch function that uses the database's resource directly + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + # Test pagination with small page size + selector = {'type': 'user'} + pages = list(mango_pages(fetch_find, selector, 3)) + + assert len(pages) == 3 # 3, 3, 1 documents + assert len(pages[0]) == 3 + assert len(pages[1]) == 3 + assert len(pages[2]) == 1 + + # Verify all documents are retrieved + all_docs = [] + for page in pages: + all_docs.extend(page) + + doc_ids = [doc['_id'] for doc in all_docs] + expected_ids = [f'doc{i}' for i in range(1, 8)] + assert set(doc_ids) == set(expected_ids) + + def test_mango_pages_with_params(self, db): + """Test mango_pages integration with additional parameters.""" + # Create test documents + test_docs = [ + {'_id': 'doc1', 'name': 'Alice', 'type': 'user', 'age': 25}, + {'_id': 'doc2', 'name': 'Bob', 'type': 'user', 'age': 30}, + {'_id': 'doc3', 'name': 'Charlie', 'type': 'user', 'age': 35}, + ] + db.save_bulk(test_docs) + + # Create index for the sort field + index_def = { + "index": { + "fields": ["type", "age"] + }, + "name": "test_index" + } + db.resource.post('_index', data=utils.force_bytes(json.dumps(index_def))) + + # Create a fetch function that uses the database's resource directly + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + # Test pagination with sort and fields parameters + selector = {'type': 'user'} + params = { + 'sort': [{'age': 'asc'}], + 'fields': ['_id', 'name', 'age'] + } + pages = list(mango_pages(fetch_find, selector, 2, params)) + + assert len(pages) == 2 + assert len(pages[0]) == 2 + assert len(pages[1]) == 1 + + # Verify documents are sorted by age + all_docs = [] + for page in pages: + all_docs.extend(page) + + ages = [doc['age'] for doc in all_docs] + assert ages == [25, 30, 35] # Should be sorted by age + + def test_mango_pages_large_dataset(self, db): + """Test mango_pages with a large dataset.""" + # Create a large number of documents + large_docs = [ + {'_id': f'large_doc_{i}', 'name': f'User{i}', 'type': 'user', 'index': i} + for i in range(1, 101) # 100 documents + ] + db.save_bulk(large_docs) + + # Test mango pagination with large dataset + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'type': 'user'} + pages = list(mango_pages(fetch_find, selector, 25)) + + # Should have 4 pages of 25 documents each + assert len(pages) == 4 + for page in pages: + assert len(page) == 25 + + def test_mango_pages_empty_results(self, db): + """Test mango_pages with empty results.""" + # Test mango pagination with no matching documents + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'type': 'nonexistent'} + pages = list(mango_pages(fetch_find, selector, 10)) + assert len(pages) == 0 + + def test_mango_pages_edge_cases(self, db): + """Test mango_pages edge cases.""" + # Create exactly one document + db.save({'_id': 'single_doc', 'name': 'Single', 'type': 'user'}) + + # Test mango pagination with single document + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'name': 'Single'} + pages = list(mango_pages(fetch_find, selector, 1)) + assert len(pages) == 1 + assert len(pages[0]) == 1 + assert pages[0][0]['_id'] == 'single_doc' + + +class TestPaginationIntegration: + """General pagination integration tests.""" + + def test_pagination_different_page_sizes(self, db): + """Test pagination with different page sizes.""" + # Create test documents + test_docs = [ + {'_id': f'doc{i}', 'name': f'User{i}', 'type': 'user'} + for i in range(1, 11) # 10 documents + ] + db.save_bulk(test_docs) + + # Create a design document + design_doc = { + "_id": "_design/size_test", + "views": { + "by_name": { + "map": "function(doc) { if (doc.name) emit(doc.name, doc); }" + } + } + } + db.save(design_doc) + + # Test different page sizes for view pagination + def fetch_view(params): + path = ['_design', 'size_test', '_view', 'by_name'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Test with page size 1 + pages = list(view_pages(fetch_view, 'size_test/by_name', 1)) + assert len(pages) == 10 + for page in pages: + assert len(page) == 1 + + # Test with page size 3 + pages = list(view_pages(fetch_view, 'size_test/by_name', 3)) + assert len(pages) == 4 # 3, 3, 3, 1 + assert len(pages[0]) == 3 + assert len(pages[1]) == 3 + assert len(pages[2]) == 3 + assert len(pages[3]) == 1 + + # Test different page sizes for mango pagination + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'type': 'user'} + + # Test with page size 2 + pages = list(mango_pages(fetch_find, selector, 2)) + assert len(pages) == 5 # 2, 2, 2, 2, 2 + for page in pages: + assert len(page) == 2 + + # Test with page size 7 + pages = list(mango_pages(fetch_find, selector, 7)) + assert len(pages) == 2 # 7, 3 + assert len(pages[0]) == 7 + assert len(pages[1]) == 3 + + # Cleanup + db.delete('_design/size_test') + + def test_pagination_performance(self, db): + """Test pagination performance with medium-sized dataset.""" + import time + + # Create a medium number of documents + medium_docs = [ + {'_id': f'perf_doc_{i}', 'name': f'User{i}', 'type': 'user', 'index': i} + for i in range(1, 201) # 200 documents + ] + db.save_bulk(medium_docs) + + # Create a design document + design_doc = { + "_id": "_design/perf_test", + "views": { + "by_index": { + "map": "function(doc) { if (doc.index) emit(doc.index, doc); }" + } + } + } + db.save(design_doc) + + # Test view pagination performance + def fetch_view(params): + path = ['_design', 'perf_test', '_view', 'by_index'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + start_time = time.time() + pages = list(view_pages(fetch_view, 'perf_test/by_index', 20)) + end_time = time.time() + + # Should have 10 pages of 20 documents each + assert len(pages) == 10 + for page in pages: + assert len(page) == 20 + + # Performance should be reasonable (less than 10 seconds for 200 docs) + assert end_time - start_time < 10 + + # Test mango pagination performance + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + start_time = time.time() + selector = {'type': 'user'} + pages = list(mango_pages(fetch_find, selector, 25)) + end_time = time.time() + + # Should have 8 pages of 25 documents each + assert len(pages) == 8 + for page in pages: + assert len(page) == 25 + + # Performance should be reasonable + assert end_time - start_time < 10 + + # Cleanup + db.delete('_design/perf_test') + + def test_pagination_consistency(self, db): + """Test that pagination returns consistent results.""" + # Create test documents with predictable ordering + test_docs = [ + {'_id': f'doc{i:03d}', 'name': f'User{i:03d}', 'type': 'user', 'order': i} + for i in range(1, 21) # 20 documents with zero-padded IDs + ] + db.save_bulk(test_docs) + + # Create a design document + design_doc = { + "_id": "_design/consistency_test", + "views": { + "by_order": { + "map": "function(doc) { if (doc.order) emit(doc.order, doc); }" + } + } + } + db.save(design_doc) + + # Test view pagination consistency + def fetch_view(params): + path = ['_design', 'consistency_test', '_view', 'by_order'] + resource = db.resource(*path) + response, result = resource.get(params=params) + return response, result + + # Run pagination multiple times and verify consistency + for _ in range(3): + pages = list(view_pages(fetch_view, 'consistency_test/by_order', 5)) + assert len(pages) == 4 # 5, 5, 5, 5 documents + + # Verify all documents are retrieved + all_docs = [] + for page in pages: + all_docs.extend(page) + + doc_ids = [doc['id'] for doc in all_docs] + expected_ids = [f'doc{i:03d}' for i in range(1, 21)] + assert set(doc_ids) == set(expected_ids) + + # Test mango pagination consistency + def fetch_find(params): + data = utils.force_bytes(json.dumps(params)) + response, result = db.resource.post('_find', data=data) + return response, result + + selector = {'type': 'user'} + + # Run pagination multiple times and verify consistency + for _ in range(3): + pages = list(mango_pages(fetch_find, selector, 7)) + assert len(pages) == 3 # 7, 7, 6 documents + + # Verify all documents are retrieved + all_docs = [] + for page in pages: + all_docs.extend(page) + + doc_ids = [doc['_id'] for doc in all_docs] + expected_ids = [f'doc{i:03d}' for i in range(1, 21)] + assert set(doc_ids) == set(expected_ids) + + # Cleanup + db.delete('_design/consistency_test') diff --git a/test/test_database.py b/test/test_database.py index d6797ac..d6a67a9 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -16,7 +16,7 @@ def test_database_initialization(self): """Test Database initialization.""" mock_resource = Mock() db = client.Database(mock_resource, "testdb") - + assert db.resource == mock_resource assert db.name == "testdb" @@ -24,7 +24,7 @@ def test_database_repr(self): """Test Database __repr__ method.""" mock_resource = Mock() db = client.Database(mock_resource, "testdb") - + repr_str = repr(db) assert "CouchDB Database" in repr_str assert "testdb" in repr_str @@ -33,10 +33,10 @@ def test_database_contains_true(self): """Test Database __contains__ method when document exists.""" mock_resource = Mock() mock_resource.head.return_value = (Mock(status_code=200), None) - + db = client.Database(mock_resource, "testdb") result = "doc123" in db - + assert result is True mock_resource.head.assert_called_once_with(["doc123"]) @@ -44,10 +44,10 @@ def test_database_contains_false(self): """Test Database __contains__ method when document doesn't exist.""" mock_resource = Mock() mock_resource.head.side_effect = exceptions.NotFound() - + db = client.Database(mock_resource, "testdb") result = "doc123" in db - + assert result is False def test_database_config(self): @@ -57,10 +57,10 @@ def test_database_config(self): mock_response.status_code = 200 mock_response.json.return_value = {"doc_count": 100, "update_seq": 200} mock_resource.get.return_value = (mock_response, {"doc_count": 100, "update_seq": 200}) - + db = client.Database(mock_resource, "testdb") result = db.config() - + assert result == {"doc_count": 100, "update_seq": 200} mock_resource.get.assert_called_once_with() @@ -70,16 +70,16 @@ def test_database_nonzero_true(self): mock_response = Mock() mock_response.status_code = 200 mock_resource.head.return_value = (mock_response, None) - + # Mock the config method that gets called by __len__ (which is called by bool() in Python 3) mock_config_response = Mock() mock_config_response.status_code = 200 mock_config_response.json.return_value = {"doc_count": 100} mock_resource.get.return_value = (mock_config_response, {"doc_count": 100}) - + db = client.Database(mock_resource, "testdb") result = bool(db) - + assert result is True # bool() calls __len__ which calls config(), not __nonzero__ which calls head() mock_resource.get.assert_called_once_with() @@ -90,17 +90,17 @@ def test_database_nonzero_false(self): mock_response = Mock() mock_response.status_code = 404 mock_resource.head.return_value = (mock_response, None) - + # Mock the config method that gets called by __len__ (which is called by bool() in Python 3) # For a database to be "false", it needs to have 0 documents mock_config_response = Mock() mock_config_response.status_code = 200 mock_config_response.json.return_value = {"doc_count": 0} mock_resource.get.return_value = (mock_config_response, {"doc_count": 0}) - + db = client.Database(mock_resource, "testdb") result = bool(db) - + assert result is False def test_database_len(self): @@ -110,10 +110,10 @@ def test_database_len(self): mock_response.status_code = 200 mock_response.json.return_value = {"doc_count": 150} mock_resource.get.return_value = (mock_response, {"doc_count": 150}) - + db = client.Database(mock_resource, "testdb") result = len(db) - + assert result == 150 def test_database_get_success(self): @@ -123,10 +123,10 @@ def test_database_get_success(self): mock_response.status_code = 200 mock_response.json.return_value = {"_id": "doc123", "_rev": "1-abc", "name": "test"} mock_resource.return_value.get.return_value = (mock_response, {"_id": "doc123", "_rev": "1-abc", "name": "test"}) - + db = client.Database(mock_resource, "testdb") result = db.get("doc123") - + assert result == {"_id": "doc123", "_rev": "1-abc", "name": "test"} mock_resource.assert_called_once_with("doc123") @@ -137,10 +137,10 @@ def test_database_get_with_params(self): mock_response.status_code = 200 mock_response.json.return_value = {"_id": "doc123", "_rev": "1-abc", "name": "test"} mock_resource.return_value.get.return_value = (mock_response, {"_id": "doc123", "_rev": "1-abc", "name": "test"}) - + db = client.Database(mock_resource, "testdb") result = db.get("doc123", revs=True, conflicts=True) - + assert result == {"_id": "doc123", "_rev": "1-abc", "name": "test"} mock_resource.assert_called_once_with("doc123") mock_resource.return_value.get.assert_called_once_with(params={"revs": True, "conflicts": True}) @@ -152,21 +152,21 @@ def test_database_get_with_deprecated_params(self): mock_response.status_code = 200 mock_response.json.return_value = {"_id": "doc123", "_rev": "1-abc", "name": "test"} mock_resource.return_value.get.return_value = (mock_response, {"_id": "doc123", "_rev": "1-abc", "name": "test"}) - + db = client.Database(mock_resource, "testdb") - + with pytest.warns(DeprecationWarning): result = db.get("doc123", params={"revs": True}) - + assert result == {"_id": "doc123", "_rev": "1-abc", "name": "test"} def test_database_get_not_found(self): """Test Database get method with not found.""" mock_resource = Mock() mock_resource.return_value.get.side_effect = exceptions.NotFound("Document not found") - + db = client.Database(mock_resource, "testdb") - + with pytest.raises(exceptions.NotFound, match="Document not found"): db.get("doc123") @@ -177,11 +177,11 @@ def test_database_save_new_document(self): mock_response.status_code = 201 mock_response.json.return_value = {"ok": True, "id": "doc123", "rev": "1-abc"} mock_resource.return_value.put.return_value = (mock_response, {"ok": True, "id": "doc123", "rev": "1-abc"}) - + db = client.Database(mock_resource, "testdb") doc = {"name": "test"} result = db.save(doc) - + assert result["_id"] is not None assert result["_rev"] == "1-abc" assert result["name"] == "test" @@ -195,11 +195,11 @@ def test_database_save_existing_document(self): mock_response.status_code = 201 mock_response.json.return_value = {"ok": True, "id": "doc123", "rev": "2-def"} mock_resource.return_value.put.return_value = (mock_response, {"ok": True, "id": "doc123", "rev": "2-def"}) - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "name": "test"} result = db.save(doc) - + assert result["_id"] == "doc123" assert result["_rev"] == "2-def" assert result["name"] == "test" @@ -211,11 +211,11 @@ def test_database_save_batch_mode(self): mock_response.status_code = 202 mock_response.json.return_value = {"ok": True, "id": "doc123", "rev": "1-abc"} mock_resource.return_value.put.return_value = (mock_response, {"ok": True, "id": "doc123", "rev": "1-abc"}) - + db = client.Database(mock_resource, "testdb") doc = {"name": "test"} result = db.save(doc, batch=True) - + assert result["_id"] is not None assert result["_rev"] == "1-abc" mock_resource.return_value.put.assert_called_once() @@ -229,10 +229,10 @@ def test_database_save_conflict(self): mock_response.status_code = 409 mock_response.json.return_value = {"error": "conflict", "reason": "Document conflict"} mock_resource.return_value.put.return_value = (mock_response, {"error": "conflict", "reason": "Document conflict"}) - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "name": "test"} - + with pytest.raises(exceptions.Conflict, match="Document conflict"): db.save(doc) @@ -249,11 +249,11 @@ def test_database_save_bulk_success(self): {"ok": True, "id": "doc1", "rev": "1-abc"}, {"ok": True, "id": "doc2", "rev": "1-def"} ]) - + db = client.Database(mock_resource, "testdb") docs = [{"name": "doc1"}, {"name": "doc2"}] result = db.save_bulk(docs) - + assert len(result) == 2 assert result[0]["_id"] is not None assert result[0]["_rev"] == "1-abc" @@ -261,7 +261,7 @@ def test_database_save_bulk_success(self): assert result[1]["_rev"] == "1-def" # The method sends docs without _rev, then adds _rev from response expected_docs = [{"name": "doc1", "_id": result[0]["_id"]}, {"name": "doc2", "_id": result[1]["_id"]}] - mock_resource.post.assert_called_once_with("_bulk_docs", + mock_resource.post.assert_called_once_with("_bulk_docs", data=json.dumps({"docs": expected_docs}).encode(), params={"all_or_nothing": "true"}) @@ -278,11 +278,11 @@ def test_database_save_bulk_with_existing_ids(self): {"ok": True, "id": "doc1", "rev": "1-abc"}, {"ok": True, "id": "doc2", "rev": "1-def"} ]) - + db = client.Database(mock_resource, "testdb") docs = [{"_id": "doc1", "name": "doc1"}, {"_id": "doc2", "name": "doc2"}] result = db.save_bulk(docs, try_setting_ids=False) - + assert len(result) == 2 assert result[0]["_id"] == "doc1" assert result[1]["_id"] == "doc2" @@ -294,14 +294,14 @@ def test_database_save_bulk_without_transaction(self): mock_response.status_code = 201 mock_response.json.return_value = [{"ok": True, "id": "doc1", "rev": "1-abc"}] mock_resource.post.return_value = (mock_response, [{"ok": True, "id": "doc1", "rev": "1-abc"}]) - + db = client.Database(mock_resource, "testdb") docs = [{"name": "doc1"}] result = db.save_bulk(docs, transaction=False) - + # The method sends docs without _rev, then adds _rev from response expected_docs = [{"name": "doc1", "_id": result[0]["_id"]}] - mock_resource.post.assert_called_once_with("_bulk_docs", + mock_resource.post.assert_called_once_with("_bulk_docs", data=json.dumps({"docs": expected_docs}).encode(), params={"all_or_nothing": "false"}) @@ -312,15 +312,15 @@ def test_database_delete_by_id(self): mock_head_response.status_code = 200 mock_head_response.headers = {"etag": '"1-abc"'} mock_resource.return_value.head.return_value = (mock_head_response, None) - + mock_delete_response = Mock() mock_delete_response.status_code = 200 mock_delete_response.json.return_value = {"ok": True, "id": "doc123", "rev": "2-def"} mock_resource.return_value.delete.return_value = (mock_delete_response, {"ok": True, "id": "doc123", "rev": "2-def"}) - + db = client.Database(mock_resource, "testdb") result = db.delete("doc123") - + assert result is None # delete method doesn't return anything mock_resource.assert_called_once_with("doc123") mock_resource.return_value.head.assert_called_once() @@ -333,16 +333,16 @@ def test_database_delete_by_document(self): mock_head_response.status_code = 200 mock_head_response.headers = {"etag": '"1-abc"'} mock_resource.return_value.head.return_value = (mock_head_response, None) - + mock_delete_response = Mock() mock_delete_response.status_code = 200 mock_delete_response.json.return_value = {"ok": True, "id": "doc123", "rev": "2-def"} mock_resource.return_value.delete.return_value = (mock_delete_response, {"ok": True, "id": "doc123", "rev": "2-def"}) - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "name": "test"} result = db.delete(doc) - + assert result is None # delete method doesn't return anything mock_resource.assert_called_once_with("doc123") @@ -350,7 +350,7 @@ def test_database_delete_invalid_document(self): """Test Database delete method with invalid document.""" db = client.Database(Mock(), "testdb") doc = {"name": "test"} # Missing _id - + with pytest.raises(ValueError, match="Invalid document, missing _id attr"): db.delete(doc) @@ -358,9 +358,9 @@ def test_database_delete_not_found(self): """Test Database delete method with not found.""" mock_resource = Mock() mock_resource.return_value.head.side_effect = exceptions.NotFound("Document not found") - + db = client.Database(mock_resource, "testdb") - + with pytest.raises(exceptions.NotFound, match="Document not found"): db.delete("doc123") @@ -377,24 +377,24 @@ def test_database_delete_bulk_success(self): {"ok": True, "id": "doc1", "rev": "2-abc"}, {"ok": True, "id": "doc2", "rev": "2-def"} ]) - + db = client.Database(mock_resource, "testdb") docs = [ {"_id": "doc1", "_rev": "1-abc", "name": "doc1"}, {"_id": "doc2", "_rev": "1-def", "name": "doc2"} ] result = db.delete_bulk(docs) - + assert len(result) == 2 assert result[0]["ok"] is True assert result[1]["ok"] is True - + # Check that _deleted flag was added expected_docs = [ {"_id": "doc1", "_rev": "1-abc", "name": "doc1", "_deleted": True}, {"_id": "doc2", "_rev": "1-def", "name": "doc2", "_deleted": True} ] - mock_resource.post.assert_called_once_with("_bulk_docs", + mock_resource.post.assert_called_once_with("_bulk_docs", data=json.dumps({"docs": expected_docs}).encode(), params={"all_or_nothing": "true"}) @@ -405,14 +405,14 @@ def test_database_delete_bulk_without_transaction(self): mock_response.status_code = 201 mock_response.json.return_value = [{"ok": True, "id": "doc1", "rev": "2-abc"}] mock_resource.post.return_value = (mock_response, [{"ok": True, "id": "doc1", "rev": "2-abc"}]) - + db = client.Database(mock_resource, "testdb") docs = [{"_id": "doc1", "_rev": "1-abc", "name": "doc1"}] result = db.delete_bulk(docs, transaction=False) - + # The method sends docs with _rev included expected_docs = [{"_id": "doc1", "_rev": "1-abc", "name": "doc1", "_deleted": True}] - mock_resource.post.assert_called_once_with("_bulk_docs", + mock_resource.post.assert_called_once_with("_bulk_docs", data=json.dumps({"docs": expected_docs}).encode(), params={"all_or_nothing": "false"}) @@ -429,13 +429,13 @@ def test_database_delete_bulk_conflict(self): {"ok": True, "id": "doc1", "rev": "2-abc"}, {"error": "conflict", "reason": "Document conflict"} ]) - + db = client.Database(mock_resource, "testdb") docs = [ {"_id": "doc1", "_rev": "1-abc", "name": "doc1"}, {"_id": "doc2", "_rev": "1-def", "name": "doc2"} ] - + with pytest.raises(exceptions.Conflict, match="one or more docs are not saved"): db.delete_bulk(docs) @@ -456,10 +456,10 @@ def test_database_all_success(self): {"id": "doc2", "key": "doc2", "value": {"rev": "1-def"}, "doc": {"_id": "doc2", "name": "doc2"}} ] }) - + db = client.Database(mock_resource, "testdb") result = list(db.all()) - + assert len(result) == 2 assert result[0]["id"] == "doc1" assert result[1]["id"] == "doc2" @@ -472,12 +472,12 @@ def test_database_all_with_keys(self): mock_response.status_code = 200 mock_response.json.return_value = {"rows": []} mock_resource.post.return_value = (mock_response, {"rows": []}) - + db = client.Database(mock_resource, "testdb") result = list(db.all(keys=["doc1", "doc2"])) - + assert result == [] - mock_resource.post.assert_called_once_with("_all_docs", + mock_resource.post.assert_called_once_with("_all_docs", params={"include_docs": "true"}, data=json.dumps({"keys": ["doc1", "doc2"]}).encode()) @@ -498,10 +498,10 @@ def test_database_all_with_flat(self): {"id": "doc2", "key": "doc2", "value": {"rev": "1-def"}} ] }) - + db = client.Database(mock_resource, "testdb") result = list(db.all(flat="id")) - + assert result == ["doc1", "doc2"] def test_database_all_as_list(self): @@ -519,10 +519,10 @@ def test_database_all_as_list(self): {"id": "doc1", "key": "doc1", "value": {"rev": "1-abc"}} ] }) - + db = client.Database(mock_resource, "testdb") result = db.all(as_list=True) - + assert isinstance(result, list) assert len(result) == 1 assert result[0]["id"] == "doc1" @@ -534,10 +534,10 @@ def test_database_cleanup(self): mock_response.status_code = 200 mock_response.json.return_value = {"ok": True} mock_resource.return_value.post.return_value = (mock_response, {"ok": True}) - + db = client.Database(mock_resource, "testdb") result = db.cleanup() - + assert result == {"ok": True} mock_resource.assert_called_once_with("_view_cleanup") @@ -548,10 +548,10 @@ def test_database_commit(self): mock_response.status_code = 200 mock_response.json.return_value = {"ok": True} mock_resource.post.return_value = (mock_response, {"ok": True}) - + db = client.Database(mock_resource, "testdb") result = db.commit() - + assert result == {"ok": True} mock_resource.post.assert_called_once_with("_ensure_full_commit") @@ -562,10 +562,10 @@ def test_database_compact(self): mock_response.status_code = 200 mock_response.json.return_value = {"ok": True} mock_resource.return_value.post.return_value = (mock_response, {"ok": True}) - + db = client.Database(mock_resource, "testdb") result = db.compact() - + assert result == {"ok": True} mock_resource.assert_called_once_with("_compact") @@ -576,10 +576,10 @@ def test_database_compact_view_success(self): mock_response.status_code = 200 mock_response.json.return_value = {"ok": True} mock_resource.return_value.post.return_value = (mock_response, {"ok": True}) - + db = client.Database(mock_resource, "testdb") result = db.compact_view("test_design") - + assert result == {"ok": True} mock_resource.assert_called_once_with("_compact", "test_design") @@ -587,9 +587,9 @@ def test_database_compact_view_not_found(self): """Test Database compact_view method with not found.""" mock_resource = Mock() mock_resource.return_value.post.side_effect = exceptions.NotFound("Design document not found") - + db = client.Database(mock_resource, "testdb") - + with pytest.raises(exceptions.NotFound, match="Design document not found"): db.compact_view("nonexistent_design") @@ -617,7 +617,7 @@ def test_database_revisions_success(self): {"rev": "1-abc", "status": "available"} ] }) - + # Mock the subsequent calls to get each revision def mock_get_side_effect(*args, **kwargs): if 'rev' in kwargs.get('params', {}): @@ -638,12 +638,12 @@ def mock_get_side_effect(*args, **kwargs): {"rev": "1-abc", "status": "available"} ] }) - + mock_resource.return_value.get.side_effect = mock_get_side_effect - + db = client.Database(mock_resource, "testdb") result = list(db.revisions("doc123")) - + assert len(result) == 3 assert result[0]["_id"] == "doc123" assert result[0]["_rev"] == "3-ghi" @@ -656,9 +656,9 @@ def test_database_revisions_not_found(self): """Test Database revisions method with not found.""" mock_resource = Mock() mock_resource.return_value.get.side_effect = exceptions.NotFound("Document not found") - + db = client.Database(mock_resource, "testdb") - + with pytest.raises(exceptions.NotFound, match="Document not found"): list(db.revisions("doc123")) @@ -682,7 +682,7 @@ def test_database_revisions_with_params(self): {"rev": "1-abc", "status": "available"} ] }) - + # Mock the subsequent call to get the revision def mock_get_side_effect(*args, **kwargs): if 'rev' in kwargs.get('params', {}): @@ -697,12 +697,12 @@ def mock_get_side_effect(*args, **kwargs): {"rev": "1-abc", "status": "available"} ] }) - + mock_resource.return_value.get.side_effect = mock_get_side_effect - + db = client.Database(mock_resource, "testdb") result = list(db.revisions("doc123", status="available", limit=10)) - + assert len(result) == 1 # The method calls get twice: once for _revs_info, once for the actual revision assert mock_resource.return_value.get.call_count == 2 @@ -714,20 +714,20 @@ def test_database_put_attachment_success(self): mock_response.status_code = 201 mock_response.json.return_value = {"ok": True, "id": "doc123", "rev": "2-def"} mock_resource.return_value.put.return_value = (mock_response, {"ok": True, "id": "doc123", "rev": "2-def"}) - + # Mock the get call that happens at the end of put_attachment mock_get_response = Mock() mock_get_response.status_code = 200 mock_get_response.json.return_value = {"_id": "doc123", "_rev": "2-def", "name": "test", "_attachments": {"test.txt": {"content_type": "text/plain"}}} mock_resource.return_value.get.return_value = (mock_get_response, {"_id": "doc123", "_rev": "2-def", "name": "test", "_attachments": {"test.txt": {"content_type": "text/plain"}}}) - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "name": "test"} content = b"Hello, World!" - + with patch('pycouchdb.client.mimetypes.guess_type', return_value=('text/plain', None)): result = db.put_attachment(doc, content, "test.txt") - + assert result["_id"] == "doc123" assert result["_rev"] == "2-def" assert "_attachments" in result @@ -744,19 +744,19 @@ def test_database_put_attachment_with_content_type(self): mock_response.status_code = 201 mock_response.json.return_value = {"ok": True, "id": "doc123", "rev": "2-def"} mock_resource.return_value.put.return_value = (mock_response, {"ok": True, "id": "doc123", "rev": "2-def"}) - + # Mock the get call that happens at the end of put_attachment mock_get_response = Mock() mock_get_response.status_code = 200 mock_get_response.json.return_value = {"_id": "doc123", "_rev": "2-def", "name": "test", "_attachments": {"test.txt": {"content_type": "text/plain"}}} mock_resource.return_value.get.return_value = (mock_get_response, {"_id": "doc123", "_rev": "2-def", "name": "test", "_attachments": {"test.txt": {"content_type": "text/plain"}}}) - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "name": "test"} content = b"Hello, World!" - + result = db.put_attachment(doc, content, "test.txt", content_type="text/plain") - + assert result["_attachments"]["test.txt"]["content_type"] == "text/plain" def test_database_get_attachment_success(self): @@ -766,12 +766,12 @@ def test_database_get_attachment_success(self): mock_response.status_code = 200 mock_response.content = b"Hello, World!" mock_resource.return_value.get.return_value = (mock_response, None) - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "_attachments": {"test.txt": {"content_type": "text/plain"}}} - + result = db.get_attachment(doc, "test.txt") - + assert result == b"Hello, World!" mock_resource.assert_called_once_with("doc123") mock_resource.return_value.get.assert_called_once_with("test.txt", stream=False, params={"rev": "1-abc"}) @@ -783,12 +783,12 @@ def test_database_get_attachment_stream(self): mock_response.status_code = 200 mock_response.content = b"Hello, World!" mock_resource.return_value.get.return_value = (mock_response, None) - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "_attachments": {"test.txt": {"content_type": "text/plain"}}} - + result = db.get_attachment(doc, "test.txt", stream=True) - + assert hasattr(result, 'iter_content') assert hasattr(result, 'iter_lines') assert hasattr(result, 'raw') @@ -798,10 +798,10 @@ def test_database_get_attachment_not_found(self): """Test Database get_attachment method with not found.""" mock_resource = Mock() mock_resource.return_value.get.side_effect = exceptions.NotFound("Attachment not found") - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc", "_attachments": {"test.txt": {"content_type": "text/plain"}}} - + with pytest.raises(exceptions.NotFound, match="Attachment not found"): db.get_attachment(doc, "test.txt") @@ -812,16 +812,16 @@ def test_database_delete_attachment_success(self): mock_response.status_code = 200 mock_response.json.return_value = {"ok": True, "id": "doc123", "rev": "2-def"} mock_resource.return_value.delete.return_value = (mock_response, {"ok": True, "id": "doc123", "rev": "2-def"}) - + db = client.Database(mock_resource, "testdb") doc = { - "_id": "doc123", - "_rev": "1-abc", + "_id": "doc123", + "_rev": "1-abc", "_attachments": {"test.txt": {"content_type": "text/plain"}} } - + result = db.delete_attachment(doc, "test.txt") - + assert result["_id"] == "doc123" assert result["_rev"] == "2-def" assert "_attachments" not in result @@ -832,10 +832,10 @@ def test_database_delete_attachment_not_found(self): """Test Database delete_attachment method with not found.""" mock_resource = Mock() mock_resource.return_value.delete.side_effect = exceptions.NotFound("Attachment not found") - + db = client.Database(mock_resource, "testdb") doc = {"_id": "doc123", "_rev": "1-abc"} - + with pytest.raises(exceptions.NotFound, match="Attachment not found"): db.delete_attachment(doc, "test.txt") @@ -854,10 +854,10 @@ def test_database_one_success(self): {"id": "doc1", "key": "doc1", "value": 1} ] }) - + db = client.Database(mock_resource, "testdb") result = db.one("test/view") - + assert result == {"id": "doc1", "key": "doc1", "value": 1} mock_resource.assert_called_once_with("_design", "test", "_view", "view") @@ -868,10 +868,10 @@ def test_database_one_not_found(self): mock_response.status_code = 200 mock_response.json.return_value = {"rows": []} mock_resource.return_value.get.return_value = (mock_response, {"rows": []}) - + db = client.Database(mock_resource, "testdb") result = db.one("test/view") - + assert result is None def test_database_one_with_flat(self): @@ -889,10 +889,10 @@ def test_database_one_with_flat(self): {"id": "doc1", "key": "doc1", "value": 1} ] }) - + db = client.Database(mock_resource, "testdb") result = db.one("test/view", flat="value") - + assert result == 1 def test_database_query_success(self): @@ -912,10 +912,10 @@ def test_database_query_success(self): {"id": "doc2", "key": "doc2", "value": 2} ] }) - + db = client.Database(mock_resource, "testdb") result = list(db.query("test/view")) - + assert len(result) == 2 assert result[0]["id"] == "doc1" assert result[1]["id"] == "doc2" @@ -936,20 +936,20 @@ def test_database_query_with_pagination(self): {"id": "doc1", "key": "doc1", "value": 1} ] }) - + db = client.Database(mock_resource, "testdb") result = list(db.query("test/view", pagesize=1)) - + assert len(result) == 1 assert result[0]["id"] == "doc1" def test_database_query_invalid_pagesize(self): """Test Database query method with invalid pagesize.""" db = client.Database(Mock(), "testdb") - + with pytest.raises(AssertionError, match="pagesize should be a positive integer"): list(db.query("test/view", pagesize="invalid")) - + with pytest.raises(AssertionError, match="pagesize should be a positive integer"): list(db.query("test/view", pagesize=0)) @@ -968,10 +968,10 @@ def test_database_query_with_limit(self): {"id": "doc1", "key": "doc1", "value": 1} ] }) - + db = client.Database(mock_resource, "testdb") result = list(db.query("test/view", pagesize=10, limit=1)) - + assert len(result) == 1 assert result[0]["id"] == "doc1" @@ -994,10 +994,10 @@ def test_database_changes_list_success(self): {"seq": 2, "id": "doc2", "changes": [{"rev": "1-def"}]} ] }) - + db = client.Database(mock_resource, "testdb") last_seq, changes = db.changes_list() - + assert last_seq == 100 assert len(changes) == 2 assert changes[0]["id"] == "doc1" @@ -1011,10 +1011,10 @@ def test_database_changes_list_with_params(self): mock_response.status_code = 200 mock_response.json.return_value = {"last_seq": 100, "results": []} mock_resource.return_value.get.return_value = (mock_response, {"last_seq": 100, "results": []}) - + db = client.Database(mock_resource, "testdb") last_seq, changes = db.changes_list(since=50, limit=10) - + assert last_seq == 100 assert changes == [] mock_resource.assert_called_once_with("_changes") @@ -1031,15 +1031,15 @@ def test_database_changes_feed(self): b'' # Empty line (heartbeat) ] mock_resource.post.return_value = (mock_response, None) - + db = client.Database(mock_resource, "testdb") messages_received = [] - + def mock_feed_reader(message, db): messages_received.append(message) if len(messages_received) >= 2: raise exceptions.FeedReaderExited() - + with patch('pycouchdb.client._listen_feed') as mock_listen: db.changes_feed(mock_feed_reader) mock_listen.assert_called_once() @@ -1051,20 +1051,173 @@ def test_database_changes_feed_with_options(self): mock_response.status_code = 200 mock_response.iter_lines.return_value = [] mock_resource.post.return_value = (mock_response, None) - + db = client.Database(mock_resource, "testdb") - + def mock_feed_reader(message, db): pass - + with patch('pycouchdb.client._listen_feed') as mock_listen: - db.changes_feed(mock_feed_reader, + db.changes_feed(mock_feed_reader, feed="longpoll", since=100, limit=50) - + mock_listen.assert_called_once() call_args = mock_listen.call_args assert call_args[1]['feed'] == "longpoll" assert call_args[1]['since'] == 100 - assert call_args[1]['limit'] == 50 \ No newline at end of file + assert call_args[1]['limit'] == 50 + + def test_database_find_method(self): + """Test the find method for Mango queries.""" + mock_resource = Mock() + mock_response = Mock() + mock_result = { + 'docs': [ + {'_id': 'doc1', 'name': 'Alice'}, + {'_id': 'doc2', 'name': 'Bob'} + ] + } + mock_resource.post.return_value = (mock_response, mock_result) + + db = client.Database(mock_resource, "testdb") + selector = {'name': {'$exists': True}} + + docs = list(db.find(selector, limit=10)) + + assert len(docs) == 2 + assert docs[0]['_id'] == 'doc1' + assert docs[1]['_id'] == 'doc2' + + # Check that post was called with correct data + mock_resource.post.assert_called_once() + call_args = mock_resource.post.call_args + assert call_args[0][0] == "_find" + + # Check the data payload + import json + data = json.loads(call_args[1]['data']) + assert data['selector'] == selector + assert data['limit'] == 10 + + def test_database_find_method_empty_result(self): + """Test find method with empty result.""" + mock_resource = Mock() + mock_response = Mock() + mock_result = {'docs': []} + mock_resource.post.return_value = (mock_response, mock_result) + + db = client.Database(mock_resource, "testdb") + selector = {'name': {'$exists': True}} + + docs = list(db.find(selector)) + assert len(docs) == 0 + + def test_database_find_method_none_result(self): + """Test find method with None result.""" + mock_resource = Mock() + mock_response = Mock() + mock_resource.post.return_value = (mock_response, None) + + db = client.Database(mock_resource, "testdb") + selector = {'name': {'$exists': True}} + + docs = list(db.find(selector)) + assert len(docs) == 0 + + @patch('pycouchdb.client.view_pages') + def test_database_view_pages_method(self, mock_view_pages): + """Test the view_pages method.""" + mock_resource = Mock() + mock_pages = [ + [{'id': 'doc1', 'key': 'key1', 'value': 'value1'}], + [{'id': 'doc2', 'key': 'key2', 'value': 'value2'}] + ] + mock_view_pages.return_value = iter(mock_pages) + + db = client.Database(mock_resource, "testdb") + + pages = list(db.view_pages("test/view", 2, {'include_docs': True})) + + assert len(pages) == 2 + mock_view_pages.assert_called_once() + + # Check that the fetch function was called correctly + call_args = mock_view_pages.call_args + assert call_args[0][1] == "test/view" # view parameter + assert call_args[0][2] == 2 # page_size parameter + assert call_args[0][3] == {'include_docs': True} # params parameter + + @patch('pycouchdb.client.mango_pages') + def test_database_mango_pages_method(self, mock_mango_pages): + """Test the mango_pages method.""" + mock_resource = Mock() + mock_pages = [ + [{'_id': 'doc1', 'name': 'Alice'}], + [{'_id': 'doc2', 'name': 'Bob'}] + ] + mock_mango_pages.return_value = iter(mock_pages) + + db = client.Database(mock_resource, "testdb") + selector = {'name': {'$exists': True}} + + pages = list(db.mango_pages(selector, 2, {'sort': [{'name': 'asc'}]})) + + assert len(pages) == 2 + mock_mango_pages.assert_called_once() + + # Check that the fetch function was called correctly + call_args = mock_mango_pages.call_args + assert call_args[0][1] == selector # selector parameter + assert call_args[0][2] == 2 # page_size parameter + assert call_args[0][3] == {'sort': [{'name': 'asc'}]} # params parameter + + def test_database_view_pages_fetch_function(self): + """Test that view_pages creates correct fetch function.""" + mock_resource = Mock() + mock_response = Mock() + mock_result = {'rows': [{'id': 'doc1', 'key': 'key1', 'value': 'value1'}]} + + # Mock the resource call chain + mock_view_resource = Mock() + mock_view_resource.get.return_value = (mock_response, mock_result) + mock_resource.return_value = mock_view_resource + + db = client.Database(mock_resource, "testdb") + + # This will call the internal fetch function + pages = list(db.view_pages("test/view", 2)) + + # Check that the resource was called with correct path + mock_resource.assert_called_with("_design", "test", "_view", "view") + + # Check that get was called with encoded parameters + mock_view_resource.get.assert_called_once() + call_args = mock_view_resource.get.call_args + params = call_args[1]['params'] + assert params['limit'] == 3 # page_size + 1 + + def test_database_mango_pages_fetch_function(self): + """Test that mango_pages creates correct fetch function.""" + mock_resource = Mock() + mock_response = Mock() + mock_result = {'docs': [{'_id': 'doc1', 'name': 'Alice'}]} + mock_resource.post.return_value = (mock_response, mock_result) + + db = client.Database(mock_resource, "testdb") + selector = {'name': {'$exists': True}} + + # This will call the internal fetch function + pages = list(db.mango_pages(selector, 2)) + + # Check that post was called with correct endpoint + mock_resource.post.assert_called_once() + call_args = mock_resource.post.call_args + assert call_args[0][0] == "_find" + + # Check the data payload + import json + data = json.loads(call_args[1]['data']) + assert data['selector'] == selector + assert data['limit'] == 2 diff --git a/test/test_pagination.py b/test/test_pagination.py new file mode 100644 index 0000000..1ed2129 --- /dev/null +++ b/test/test_pagination.py @@ -0,0 +1,244 @@ +""" +Unit tests for pycouchdb.pagination module. +""" + +import pytest +from unittest.mock import Mock, MagicMock +from pycouchdb.pagination import view_pages, mango_pages + + +class TestViewPages: + """Test view_pages function.""" + + def test_view_pages_single_page(self): + """Test pagination with single page of results.""" + # Mock response with single page + mock_response = Mock() + mock_result = { + 'rows': [ + {'id': 'doc1', 'key': 'key1', 'value': 'value1'}, + {'id': 'doc2', 'key': 'key2', 'value': 'value2'} + ] + } + + fetch_mock = Mock(return_value=(mock_response, mock_result)) + + pages = list(view_pages(fetch_mock, "test/view", 2)) + + assert len(pages) == 1 + assert len(pages[0]) == 2 + assert pages[0][0]['id'] == 'doc1' + assert pages[0][1]['id'] == 'doc2' + + # Should have been called once with limit=3 (page_size + 1) + fetch_mock.assert_called_once() + call_args = fetch_mock.call_args[0][0] + assert call_args['limit'] == 3 + + def test_view_pages_multiple_pages(self): + """Test pagination with multiple pages.""" + # First page - has more results + page1_response = Mock() + page1_result = { + 'rows': [ + {'id': 'doc1', 'key': 'key1', 'value': 'value1'}, + {'id': 'doc2', 'key': 'key2', 'value': 'value2'}, + {'id': 'doc3', 'key': 'key3', 'value': 'value3'} # Extra row indicates more pages + ] + } + + # Second page - final page + page2_response = Mock() + page2_result = { + 'rows': [ + {'id': 'doc4', 'key': 'key4', 'value': 'value4'} + ] + } + + fetch_mock = Mock(side_effect=[ + (page1_response, page1_result), + (page2_response, page2_result) + ]) + + pages = list(view_pages(fetch_mock, "test/view", 2)) + + assert len(pages) == 2 + assert len(pages[0]) == 2 # First page without extra row + assert len(pages[1]) == 1 # Second page + + # Check that second call has correct pagination parameters + assert fetch_mock.call_count == 2 + second_call_args = fetch_mock.call_args_list[1][0][0] + assert second_call_args['startkey'] == '"key2"' # Last key from first page (JSON-encoded) + assert second_call_args['startkey_docid'] == 'doc2' # Last doc id from first page + assert second_call_args['skip'] == 1 + + def test_view_pages_empty_result(self): + """Test pagination with empty result.""" + mock_response = Mock() + mock_result = {'rows': []} + + fetch_mock = Mock(return_value=(mock_response, mock_result)) + + pages = list(view_pages(fetch_mock, "test/view", 2)) + + assert len(pages) == 0 + fetch_mock.assert_called_once() + + def test_view_pages_none_result(self): + """Test pagination with None result.""" + mock_response = Mock() + + fetch_mock = Mock(return_value=(mock_response, None)) + + pages = list(view_pages(fetch_mock, "test/view", 2)) + + assert len(pages) == 0 + fetch_mock.assert_called_once() + + def test_view_pages_with_params(self): + """Test pagination with additional parameters.""" + mock_response = Mock() + mock_result = {'rows': []} + + fetch_mock = Mock(return_value=(mock_response, mock_result)) + + params = {'include_docs': True, 'descending': True} + pages = list(view_pages(fetch_mock, "test/view", 2, params)) + + fetch_mock.assert_called_once() + call_args = fetch_mock.call_args[0][0] + assert call_args['include_docs'] is True + assert call_args['descending'] is True + assert call_args['limit'] == 3 + + +class TestMangoPages: + """Test mango_pages function.""" + + def test_mango_pages_single_page(self): + """Test pagination with single page of results.""" + mock_response = Mock() + mock_result = { + 'docs': [ + {'_id': 'doc1', 'name': 'Alice'}, + {'_id': 'doc2', 'name': 'Bob'} + ], + 'bookmark': None # No more pages + } + + fetch_mock = Mock(return_value=(mock_response, mock_result)) + + selector = {'name': {'$exists': True}} + pages = list(mango_pages(fetch_mock, selector, 2)) + + assert len(pages) == 1 + assert len(pages[0]) == 2 + assert pages[0][0]['_id'] == 'doc1' + assert pages[0][1]['_id'] == 'doc2' + + fetch_mock.assert_called_once() + call_args = fetch_mock.call_args[0][0] + assert call_args['selector'] == selector + assert call_args['limit'] == 2 + + def test_mango_pages_multiple_pages(self): + """Test pagination with multiple pages.""" + # First page + page1_response = Mock() + page1_result = { + 'docs': [ + {'_id': 'doc1', 'name': 'Alice'}, + {'_id': 'doc2', 'name': 'Bob'} + ], + 'bookmark': 'bookmark123' # More pages available + } + + # Second page + page2_response = Mock() + page2_result = { + 'docs': [ + {'_id': 'doc3', 'name': 'Charlie'} + ], + 'bookmark': None # No more pages + } + + fetch_mock = Mock(side_effect=[ + (page1_response, page1_result), + (page2_response, page2_result) + ]) + + selector = {'name': {'$exists': True}} + pages = list(mango_pages(fetch_mock, selector, 2)) + + assert len(pages) == 2 + assert len(pages[0]) == 2 + assert len(pages[1]) == 1 + + # Check that second call has bookmark + assert fetch_mock.call_count == 2 + second_call_args = fetch_mock.call_args_list[1][0][0] + assert second_call_args['bookmark'] == 'bookmark123' + + def test_mango_pages_empty_result(self): + """Test pagination with empty result.""" + mock_response = Mock() + mock_result = {'docs': []} + + fetch_mock = Mock(return_value=(mock_response, mock_result)) + + selector = {'name': {'$exists': True}} + pages = list(mango_pages(fetch_mock, selector, 2)) + + assert len(pages) == 0 + fetch_mock.assert_called_once() + + def test_mango_pages_none_result(self): + """Test pagination with None result.""" + mock_response = Mock() + + fetch_mock = Mock(return_value=(mock_response, None)) + + selector = {'name': {'$exists': True}} + pages = list(mango_pages(fetch_mock, selector, 2)) + + assert len(pages) == 0 + fetch_mock.assert_called_once() + + def test_mango_pages_with_params(self): + """Test pagination with additional parameters.""" + mock_response = Mock() + mock_result = {'docs': []} + + fetch_mock = Mock(return_value=(mock_response, mock_result)) + + selector = {'name': {'$exists': True}} + params = {'sort': [{'name': 'asc'}], 'fields': ['_id', 'name']} + pages = list(mango_pages(fetch_mock, selector, 2, params)) + + fetch_mock.assert_called_once() + call_args = fetch_mock.call_args[0][0] + assert call_args['selector'] == selector + assert call_args['sort'] == [{'name': 'asc'}] + assert call_args['fields'] == ['_id', 'name'] + assert call_args['limit'] == 2 + + def test_mango_pages_stops_on_empty_bookmark(self): + """Test that pagination stops when bookmark is empty string.""" + # First page with empty bookmark + page1_response = Mock() + page1_result = { + 'docs': [ + {'_id': 'doc1', 'name': 'Alice'} + ], + 'bookmark': '' # Empty bookmark should stop pagination + } + + fetch_mock = Mock(return_value=(page1_response, page1_result)) + + selector = {'name': {'$exists': True}} + pages = list(mango_pages(fetch_mock, selector, 2)) + + assert len(pages) == 1 + assert len(pages[0]) == 1 + fetch_mock.assert_called_once() # Should not make second call diff --git a/test/test_utils.py b/test/test_utils.py index c775e2e..726ef08 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -14,7 +14,7 @@ def test_extract_credentials_no_auth(self): """Test extracting credentials from URL without authentication.""" url = 'http://localhost:5984/_config/' clean_url, credentials = utils.extract_credentials(url) - + assert clean_url == 'http://localhost:5984/_config/' assert credentials is None @@ -22,7 +22,7 @@ def test_extract_credentials_basic_auth(self): """Test extracting credentials from URL with basic authentication.""" url = 'http://joe:secret@localhost:5984/_config/' clean_url, credentials = utils.extract_credentials(url) - + assert clean_url == 'http://localhost:5984/_config/' assert credentials == ('joe', 'secret') @@ -30,7 +30,7 @@ def test_extract_credentials_encoded_auth(self): """Test extracting credentials from URL with encoded authentication.""" url = 'http://joe%40example.com:secret@localhost:5984/_config/' clean_url, credentials = utils.extract_credentials(url) - + assert clean_url == 'http://localhost:5984/_config/' assert credentials == ('joe@example.com', 'secret') @@ -38,7 +38,7 @@ def test_extract_credentials_password_with_colon(self): """Test extracting credentials from URL with password containing colons.""" url = 'http://user:pass:word@localhost:5984/_config/' clean_url, credentials = utils.extract_credentials(url) - + assert clean_url == 'http://localhost:5984/_config/' assert credentials == ('user', 'pass:word') @@ -46,7 +46,7 @@ def test_extract_credentials_password_with_multiple_colons(self): """Test extracting credentials from URL with password containing multiple colons.""" url = 'http://user:pass:word:with:colons@localhost:5984/_config/' clean_url, credentials = utils.extract_credentials(url) - + assert clean_url == 'http://localhost:5984/_config/' assert credentials == ('user', 'pass:word:with:colons') @@ -54,7 +54,7 @@ def test_extract_credentials_encoded_password_with_colon(self): """Test extracting credentials from URL with encoded password containing colons.""" url = 'http://user:pass%3Aword@localhost:5984/_config/' clean_url, credentials = utils.extract_credentials(url) - + assert clean_url == 'http://localhost:5984/_config/' assert credentials == ('user', 'pass:word') @@ -62,7 +62,7 @@ def test_extract_credentials_invalid_format(self): """Test extracting credentials from URL with invalid credential format.""" url = 'http://user@localhost:5984/_config/' clean_url, credentials = utils.extract_credentials(url) - + assert clean_url == 'http://localhost:5984/_config/' assert credentials is None @@ -123,7 +123,7 @@ class MockResponse: def __init__(self): self.headers = {'content-type': 'application/json'} self.content = b'{"key": "value"}' - + response = MockResponse() result = utils.as_json(response) assert result == {'key': 'value'} @@ -134,12 +134,38 @@ class MockResponse: def __init__(self): self.headers = {'content-type': 'application/json'} self.content = b'invalid json' - + response = MockResponse() # The function doesn't handle JSON decode errors, so it raises an exception with pytest.raises(json.JSONDecodeError): # json.loads raises JSONDecodeError for invalid JSON utils.as_json(response) + def test_as_json_invalid_utf8(self): + """Test as_json with invalid UTF-8 content.""" + class MockResponse: + def __init__(self): + self.headers = {'content-type': 'application/json'} + # Valid JSON but with invalid UTF-8 sequence in the middle + self.content = b'{"key": "value", "invalid": "\xff\xfe"}' + + response = MockResponse() + result = utils.as_json(response) + # Should handle UTF-8 decode error gracefully and still parse JSON + assert result == {'key': 'value', 'invalid': '\ufffd\ufffd'} + + def test_as_json_invalid_utf8_with_replacement(self): + """Test as_json with invalid UTF-8 content that gets replaced.""" + class MockResponse: + def __init__(self): + self.headers = {'content-type': 'application/json'} + # Valid JSON with invalid UTF-8 that will be replaced with replacement character + self.content = b'{"key": "value", "invalid": "\xff\xfe\x80"}' + + response = MockResponse() + result = utils.as_json(response) + # Should handle UTF-8 decode error and parse JSON with replacement characters + assert result == {'key': 'value', 'invalid': '\ufffd\ufffd\ufffd'} + def test_encode_view_options(self): """Test encoding view options.""" options = { @@ -151,9 +177,9 @@ def test_encode_view_options(self): 'descending': True, 'include_docs': True } - + result = utils.encode_view_options(options) - + assert result['key'] == '"value"' assert result['startkey'] == '"start"' assert result['endkey'] == '"end"' @@ -167,7 +193,7 @@ def test_encode_view_options_with_list(self): options = { 'keys': ['key1', 'key2', 'key3'] } - + result = utils.encode_view_options(options) assert result['keys'] == ['key1', 'key2', 'key3'] # 'keys' is not in the special list, so it's not converted @@ -176,7 +202,7 @@ def test_encode_view_options_with_dict(self): options = { 'key': {'nested': 'value'} } - + result = utils.encode_view_options(options) assert result['key'] == '{"nested": "value"}' @@ -187,7 +213,7 @@ def test_encode_view_options_boolean_values(self): 'include_docs': True, 'reduce': False } - + result = utils.encode_view_options(options) assert result['descending'] == False # Boolean values are not converted to strings assert result['include_docs'] == True @@ -200,8 +226,8 @@ def test_encode_view_options_numeric_values(self): 'skip': 0, 'group_level': 2 } - + result = utils.encode_view_options(options) assert result['limit'] == 100 assert result['skip'] == 0 - assert result['group_level'] == 2 \ No newline at end of file + assert result['group_level'] == 2