From 4df085384db82f4a7aef26dca841f7f9c4274b58 Mon Sep 17 00:00:00 2001 From: Virgin Bitton Date: Tue, 4 Apr 2023 13:45:13 +0200 Subject: [PATCH 1/2] feat: Add JoinField and update indexation command to select on which models to start the indexation feat: Automatically divide queryset in batches fix query is_ordered add batch_size and the check if min max exists Remove stdoutwrite fix chunk_size Add the stdout to display the current batch add batch_size Add batch type args to indexation command fix: If `pks["min"]` & `pks["max"]` were same value, iteration did not start Feature/add index manage func (#80) * add manage_index in a separate fonction * add manage_index in a separate fonction * fix by using command stderr and stdout * add the check on opensearchAction * add custom registry * add document_management * fix the document registry * add using to chose the os alias * fix using * add db_alias * remove db_alias --------- Co-authored-by: Abdellatif Missoumi skip when empty objects (#83) Co-authored-by: Abdellatif --- bin/pre_commit.sh | 15 -- django_opensearch_dsl/documents.py | 66 ++++-- django_opensearch_dsl/fields.py | 4 + .../management/commands/opensearch.py | 214 +++++------------- django_opensearch_dsl/registries.py | 5 + django_opensearch_dsl/utils/__init__.py | 2 + .../utils/document_management.py | 206 +++++++++++++++++ .../utils/index_management.py | 105 +++++++++ 8 files changed, 425 insertions(+), 192 deletions(-) create mode 100644 django_opensearch_dsl/utils/__init__.py create mode 100644 django_opensearch_dsl/utils/document_management.py create mode 100644 django_opensearch_dsl/utils/index_management.py diff --git a/bin/pre_commit.sh b/bin/pre_commit.sh index d8a2bf2..61d048a 100755 --- a/bin/pre_commit.sh +++ b/bin/pre_commit.sh @@ -55,21 +55,6 @@ fi echo "" -################################################################################ -# MYPY # -################################################################################ -echo -n "${Cyan}Running mypy... $Color_Off" -out=$(mypy django_opensearch_dsl --disallow-untyped-def) -if [ "$?" -ne 0 ] ; then - echo "${Red}Error !$Color_Off" - echo -e "$out" - EXIT_CODE=1 -else - echo "${Green}Ok ✅ $Color_Off" -fi -echo "" - - ################################################################################ # BANDIT # ################################################################################ diff --git a/django_opensearch_dsl/documents.py b/django_opensearch_dsl/documents.py index a8023e3..7929dd6 100644 --- a/django_opensearch_dsl/documents.py +++ b/django_opensearch_dsl/documents.py @@ -1,3 +1,5 @@ +import copy +import io import sys import time from collections import deque @@ -6,7 +8,7 @@ import opensearchpy from django.db import models -from django.db.models import Q, QuerySet +from django.db.models import Max, Min, Q, QuerySet from opensearchpy.helpers import bulk, parallel_bulk from opensearchpy.helpers.document import Document as DSLDocument @@ -101,34 +103,56 @@ def get_indexing_queryset( count: int = None, action: CommandAction = CommandAction.INDEX, stdout: TextIO = sys.stdout, + batch_size: int = None, + batch_type: str = "offset", ) -> Iterable: """Divide the queryset into chunks.""" - chunk_size = self.django.queryset_pagination + chunk_size = batch_size or self.django.queryset_pagination qs = self.get_queryset(filter_=filter_, exclude=exclude, count=count, alias=alias) qs = qs.order_by("pk") if not qs.query.is_sliced else qs - total = qs.count() + count = qs.count() model = self.django.model.__name__ action = action.present_participle.title() - - i = 0 - done = 0 start = time.time() + done = 0 if verbose: - stdout.write(f"{action} {model}: 0% ({self._eta(start, done, total)})\r") - while done < total: - if verbose: - stdout.write(f"{action} {model}: {round(i / total * 100)}% ({self._eta(start, done, total)})\r") - - for obj in qs[i : i + chunk_size]: - done += 1 - yield obj - - i = min(i + chunk_size, total) - - if verbose: - stdout.write(f"{action} {total} {model}: OK \n") - - def init_prepare(self) -> list[tuple[str, fields.DODField, Callable[[models.Model], Any]]]: + stdout.write(f"{action} {model}: 0% ({self._eta(start, done, count)})\r") + + if count == 0: + stdout.write(f"No {model} objects to {action.lower()}.\n") + return + + if batch_type == "pk_filters": + pks = qs.aggregate(min=Min("pk"), max=Max("pk")) + total_batches = (pks["max"] - pks["min"]) // chunk_size + for batch_number, offset in enumerate(range(pks["min"], pks["max"] + 1, chunk_size), start=1): + batch_qs = list(qs.filter(pk__gte=offset, pk__lt=offset + chunk_size)) + stdout.write(f"Processing batch {batch_number}/{total_batches}: \n") + for obj in batch_qs: + done += 1 + if done % chunk_size == 0: + stdout.write( + f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r" + ) + yield obj + if len(batch_qs) > 0: + stdout.write(f"Max primary key in the current {model} batch: {batch_qs[-1].pk}\n") + else: + total_batches = (count + chunk_size - 1) // chunk_size + for batch_number, offset in enumerate(range(0, count, chunk_size), start=1): + batch_qs = list(qs[offset : offset + chunk_size].all()) + stdout.write(f"Processing batch {batch_number}/{total_batches}: \n") + for obj in batch_qs: + done += 1 + if done % chunk_size == 0: + stdout.write( + f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r" + ) + yield obj + if len(batch_qs) > 0: + stdout.write(f"Max primary key in the current {model} batch: {batch_qs[-1].pk}\n") + + def init_prepare(self): """Initialise the data model preparers once here. Extracts the preparers from the model and generate a list of callables diff --git a/django_opensearch_dsl/fields.py b/django_opensearch_dsl/fields.py index f8b124b..a4c5063 100644 --- a/django_opensearch_dsl/fields.py +++ b/django_opensearch_dsl/fields.py @@ -217,6 +217,10 @@ class IpField(DODField, fields.Ip): """Allow indexing of IPv4 and IPv6 addresses.""" +class JoinField(DODField, fields.Join): + """Allow indexing of Join fields (with parent/child relation).""" + + class LongField(DODField, fields.Long): """Allow indexing of long. diff --git a/django_opensearch_dsl/management/commands/opensearch.py b/django_opensearch_dsl/management/commands/opensearch.py index 0c826f1..9424626 100644 --- a/django_opensearch_dsl/management/commands/opensearch.py +++ b/django_opensearch_dsl/management/commands/opensearch.py @@ -16,6 +16,7 @@ from ...apps import DODConfig from ...enums import CommandAction from ...registries import registry +from ...utils import manage_document, manage_index from ..types import Values, parse @@ -88,76 +89,23 @@ def _manage_index( **options: Any, ) -> None: """Manage the creation and deletion of indices.""" - action = CommandAction(action) # type: ignore[call-arg] - known = registry.get_indices() - - # Filter indices - if indices: - # Ensure every given indices exists - known_name = [i._name for i in known] # noqa - unknown = set(indices) - set(known_name) - if unknown: - self.stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") - raise CommandError - - # Only keep given indices - given_indices = [i for i in known if i._name in indices] - else: - given_indices = list(known) - - # Display expected action - if verbosity or not force: - self.stdout.write(f"The following indices will be {action.past}:") - for index in given_indices: - self.stdout.write(f"\t- {index._name}.") # noqa - self.stdout.write("") - - # Ask for confirmation to continue - if not force: # pragma: no cover - while True: - p = input("Continue ? [y]es [n]o : ") - if p.lower() in ["yes", "y"]: - self.stdout.write("") - break - elif p.lower() in ["no", "n"]: - raise CommandError - - pp = action.present_participle.title() - for index in given_indices: - if verbosity: - self.stdout.write( - f"{pp} index '{index._name}'...\r", - ending="", - ) # noqa - self.stdout.flush() - try: - if action == CommandAction.CREATE: - index.create(using=using) - elif action == CommandAction.DELETE: - index.delete(using=using) - elif action == CommandAction.UPDATE: - index.put_mapping(using=using, body=index.to_dict()["mappings"]) - else: - try: - index.delete(using=using) - except opensearchpy.exceptions.NotFoundError: - pass - index.create(using=using) - except opensearchpy.exceptions.TransportError as e: - if verbosity or not ignore_error: - error = self.style.ERROR(f"Error: {e.error} - {e.info}") - self.stderr.write(f"{pp} index '{index._name}'...\n{error}") # noqa - if not ignore_error: - self.stderr.write("exiting...") - raise CommandError - else: - if verbosity: - self.stdout.write(f"{pp} index '{index._name}'... {self.style.SUCCESS('OK')}") # noqa + manage_index( + action, + indices, + force, + ignore_error, + verbosity, + stderr=self.stderr, + stdout=self.stdout, + style=self.style, + using=using, + ) def _manage_document( self, action: CommandAction, indices: list[str], + objects: list[str], force: bool, filters: list[tuple[str, str]], excludes: list[tuple[str, str]], @@ -168,98 +116,31 @@ def _manage_document( missing: bool, using: OpenSearch, database: str, + batch_size: int, + batch_type: str, **options: Any, ) -> None: """Manage the creation and deletion of indices.""" - action = CommandAction(action) # type: ignore[call-arg] - known = registry.get_indices() - filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None - exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None - - # Filter indices - if indices: - # Ensure every given indices exists - known_name = [i._name for i in known] # noqa - unknown = set(indices) - set(known_name) - if unknown: - self.stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") - raise CommandError - - # Only keep given indices - given_indices = list(filter(lambda i: i._name in indices, known)) # type: ignore[arg-type] - else: - given_indices = list(known) - - # Ensure every indices needed are created - not_created = [i._name for i in given_indices if not i.exists(using=using)] # noqa - if not_created: - self.stderr.write(f"The following indices are not created : {not_created}") - self.stderr.write("Use 'python3 manage.py opensearch list' to list indices' state.") - raise CommandError - - # Check field, preparing to display expected actions - s = f"The following documents will be {action.past}:" - kwargs_list = [] - for index in given_indices: - # Handle --missing - exclude_ = exclude - if missing and action == CommandAction.INDEX: - q = Q(pk__in=[h.meta.id for h in index.search(using=using).extra(stored_fields=[]).scan()]) - exclude_ = exclude_ & q if exclude_ is not None else q - - document = index._doc_types[0]() # noqa - try: - kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count}) - qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count, alias=database).count() - except FieldError as e: - model = index._doc_types[0].django.model.__name__ # noqa - self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa - raise CommandError - else: - s += f"\n\t- {qs} {document.django.model.__name__}." - - # Display expected actions - if verbosity or not force: - self.stdout.write(s + "\n\n") - - # Ask for confirmation to continue - if not force: # pragma: no cover - while True: - p = input("Continue ? [y]es [n]o : ") - if p.lower() in ["yes", "y"]: - self.stdout.write("\n") - break - elif p.lower() in ["no", "n"]: - raise CommandError - - result = "\n" - for index, kwargs in zip(given_indices, kwargs_list): - document = index._doc_types[0]() # noqa - qs = document.get_indexing_queryset( - stdout=self.stdout._out, verbose=verbosity, action=action, alias=database, **kwargs - ) - success, errors = document.update( - qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False, using=using - ) - - success_str = self.style.SUCCESS(success) if success else success - errors_str = self.style.ERROR(len(errors)) if errors else len(errors) - model = document.django.model.__name__ - - if verbosity == 1: - result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" - reasons: defaultdict[str, int] = defaultdict(int) - for err in errors: # Count occurrence of each error - error = err.get(action, {"result": "unknown error"}).get("result", "unknown error") - reasons[error] += 1 - for reason, total in reasons.items(): - result += f" - {reason} : {total}\n" - - if verbosity > 1: - result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" - - if verbosity: - self.stdout.write(result + "\n") + manage_document( + action=action, + indices=indices, + objects=objects, + filters=filters, + excludes=excludes, + force=force, + parallel=parallel, + count=count, + refresh=refresh, + missing=missing, + using=using, + database=database, + batch_size=batch_size, + batch_type=batch_type, + verbosity=verbosity, + stderr=self.stderr, + stdout=self.stdout, + style=self.style, + ) def add_arguments(self, parser: argparse.ArgumentParser) -> None: """Add arguments to parser.""" @@ -281,7 +162,7 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: help="Alias of the OpenSearch connection to use. Default to 'default'.", ) - # 'manage' subcommand + # 'index' subcommand subparser = subparsers.add_parser( "index", help="Manage the creation an deletion of indices.", @@ -346,7 +227,13 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: default=connection("default"), help="Alias of the OpenSearch connection to use. Default to 'default'.", ) - subparser.add_argument("-d", "--database", default=None, dest="database", help="Nominates a database.") + subparser.add_argument( + "-d", + "--database", + type=str, + default=None, + help="Nominates a database to use as source.", + ) subparser.add_argument( "-f", "--filters", @@ -380,6 +267,7 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: subparser.add_argument( "-i", "--indices", type=str, nargs="*", help="Only update documents on the given indices." ) + subparser.add_argument("-o", "--objects", type=str, nargs="*", help="Only update selected objects.") subparser.add_argument( "-c", "--count", type=int, default=None, help="Update at most COUNT objects (0 to index everything)." ) @@ -414,6 +302,20 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: default=False, help="When used with 'index' action, only index documents not indexed yet.", ) + subparser.add_argument( + "-b", + "--batch-size", + type=int, + default=None, + help="Specify the batch size for processing documents.", + ) + subparser.add_argument( + "-t", + "--batch-type", + type=str, + default="offset", + help="Specify the batch type for processing documents (pk_filters | offset).", + ) self.usage = parser.format_usage() diff --git a/django_opensearch_dsl/registries.py b/django_opensearch_dsl/registries.py index facff64..ed498c7 100644 --- a/django_opensearch_dsl/registries.py +++ b/django_opensearch_dsl/registries.py @@ -56,6 +56,7 @@ def register_document(self, doc_class: type["Document"]) -> type["Document"]: "ignore_signals": getattr(django_meta, "ignore_signals", False), "auto_refresh": getattr(django_meta, "auto_refresh", DODConfig.auto_refresh_enabled()), "related_models": getattr(django_meta, "related_models", []), + "order_indexing_queryset": getattr(django_meta, "order_indexing_queryset", True), } ) if not django_attr.model: # pragma: no cover @@ -194,5 +195,9 @@ def __contains__(self, instance: type[Model]) -> bool: f"'in <{type(self).__name__}>' requires a Model subclass as left operand, not {type(dict).__name__}" ) + def get_indices_raw(self) -> defaultdict[Index, set[type["Document"]]]: + """Get all indices as they are store in the registry or the indices for a list of models.""" + return self._indices + registry = DocumentRegistry() diff --git a/django_opensearch_dsl/utils/__init__.py b/django_opensearch_dsl/utils/__init__.py new file mode 100644 index 0000000..89a98f3 --- /dev/null +++ b/django_opensearch_dsl/utils/__init__.py @@ -0,0 +1,2 @@ +from .document_management import manage_document +from .index_management import manage_index diff --git a/django_opensearch_dsl/utils/document_management.py b/django_opensearch_dsl/utils/document_management.py new file mode 100644 index 0000000..d783611 --- /dev/null +++ b/django_opensearch_dsl/utils/document_management.py @@ -0,0 +1,206 @@ +import functools +import operator +import sys +from collections import defaultdict +from typing import Any, List, Tuple + +from django.core.exceptions import FieldError +from django.core.management.base import CommandError, OutputWrapper +from django.core.management.color import color_style +from django.db.models import Q +from opensearchpy import OpenSearch + +from django_opensearch_dsl.enums import CommandAction +from django_opensearch_dsl.registries import registry as default_registry + + +def manage_document( + action, + filters: List[Tuple[str, Any]] = None, + excludes: List[Tuple[str, Any]] = None, + indices: List[str] = None, + objects: List[str] = None, + parallel: bool = False, + refresh: bool = False, + missing: bool = False, + force: bool = False, + database: str = None, + using: OpenSearch = None, + batch_type: str = "offset", + batch_size: int = None, + count: int = None, + verbosity: int = 1, + stderr=OutputWrapper(sys.stderr), + stdout=OutputWrapper(sys.stdout), + style=color_style(), + registry=default_registry, +): # noqa + """Manage the creation and deletion of indices.""" + choices = [CommandAction.INDEX.value, CommandAction.DELETE.value, CommandAction.UPDATE.value] + if action not in choices: + raise ValueError(f"Invalid action '{action}'. Valid actions are: {', '.join(choices)}") + + action = CommandAction(action) + known = registry.get_indices() + filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None + exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None + + # Filter existing objects + valid_models = [] + registered_models = [m.__name__.lower() for m in registry.get_models()] + if objects: + for model in objects: + if model.lower() in registered_models: + valid_models.append(model) + else: + stderr.write(f"Unknown object '{model}', choices are: '{registered_models}'") + raise CommandError + + # Filter indices + if indices: + # Ensure every given indices exists + known_name = [i._name for i in known] # noqa + unknown = set(indices) - set(known_name) + if unknown: + stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") + raise CommandError + + # Only keep given indices + indices = list(filter(lambda i: i._name in indices, known)) # noqa + else: + indices = known + + # Ensure every indices needed are created + not_created = [i._name for i in indices if not i.exists(using=using)] # noqa + if not_created: + stderr.write(f"The following indices are not created : {not_created}") + stderr.write("Use 'python3 manage.py opensearch list' to list indices' state.") + raise CommandError + + # Check field, preparing to display expected actions + s = f"The following documents will be {action.past}:" + kwargs_list = [] + + if objects: + django_models = [m for m in registry.get_models() if m.__name__.lower() in valid_models] + all_os_models = [] + selected_os_models = [] + indices_raw = registry.get_indices_raw() + + for k, v in indices_raw.items(): + for model in list(v): + all_os_models.append(model) + + for os_model in all_os_models: + if os_model.django.model in django_models and os_model._index._name in list(i._name for i in indices): + selected_os_models.append(os_model) + + # Handle --missing + exclude_ = exclude + for model in selected_os_models: + try: + kwargs_list.append({"alias": database, "filter_": filter_, "exclude": exclude_, "count": count}) + qs = model().get_queryset(filter_=filter_, exclude=exclude_, count=count, alias=database).count() + except FieldError as e: + stderr.write(f"Error while filtering on '{model.django.model.__name__}':\n{e}'") # noqa + raise CommandError + else: + s += f"\n\t- {qs} {model.django.model.__name__}." + else: + for index in indices: + # Handle --missing + exclude_ = exclude + if missing and action == CommandAction.INDEX: + q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()]) + exclude_ = exclude_ & q if exclude_ is not None else q + + document = index._doc_types[0]() # noqa + try: + kwargs_list.append({"alias": database, "filter_": filter_, "exclude": exclude_, "count": count}) + qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count, alias=database).count() + except FieldError as e: + model = index._doc_types[0].django.model.__name__ # noqa + stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa + raise CommandError + else: + s += f"\n\t- {qs} {document.django.model.__name__}." + + # Display expected actions + if verbosity or not force: + stdout.write(s + "\n\n") + + # Ask for confirmation to continue + if not force: # pragma: no cover + while True: + p = input("Continue ? [y]es [n]o : ") + if p.lower() in ["yes", "y"]: + stdout.write("\n") + break + elif p.lower() in ["no", "n"]: + raise CommandError + + result = "\n" + if objects: + for model, kwargs in zip(selected_os_models, kwargs_list): + document = model() # noqa + qs = document.get_indexing_queryset( + stdout=stdout._out, + verbose=verbosity, + action=action, + batch_size=batch_size, + batch_type=batch_type, + **kwargs, + ) + success, errors = document.update( + qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False, using=using + ) + + success_str = style.SUCCESS(success) if success else success + errors_str = style.ERROR(len(errors)) if errors else len(errors) + model = document.django.model.__name__ + + if verbosity == 1: + result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" + reasons = defaultdict(int) + for e in errors: # Count occurrence of each error + error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") + reasons[error] += 1 + for reasons, total in reasons.items(): + result += f" - {reasons} : {total}\n" + + if verbosity > 1: + result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" + + else: + for index, kwargs in zip(indices, kwargs_list): + document = index._doc_types[0]() # noqa + qs = document.get_indexing_queryset( + stdout=stdout._out, + verbose=verbosity, + action=action, + batch_size=batch_size, + batch_type=batch_type, + **kwargs, + ) + success, errors = document.update( + qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False, using=using + ) + + success_str = style.SUCCESS(success) if success else success + errors_str = style.ERROR(len(errors)) if errors else len(errors) + model = document.django.model.__name__ + + if verbosity == 1: + result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n" + reasons = defaultdict(int) + for e in errors: # Count occurrence of each error + error = e.get(action, {"result": "unknown error"}).get("result", "unknown error") + reasons[error] += 1 + for reasons, total in reasons.items(): + result += f" - {reasons} : {total}\n" + + if verbosity > 1: + result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n" + + if verbosity: + stdout.write(result + "\n") diff --git a/django_opensearch_dsl/utils/index_management.py b/django_opensearch_dsl/utils/index_management.py new file mode 100644 index 0000000..4cb7fb7 --- /dev/null +++ b/django_opensearch_dsl/utils/index_management.py @@ -0,0 +1,105 @@ +import sys +from typing import List + +import opensearchpy +from django.core.management.base import CommandError, OutputWrapper +from django.core.management.color import color_style + +from django_opensearch_dsl.enums import CommandAction +from django_opensearch_dsl.registries import registry as default_registry + + +def manage_index( + action, + indices: List[str] = None, + force: bool = False, + ignore_error: bool = False, + verbosity: int = 1, + stderr=OutputWrapper(sys.stderr), + stdout=OutputWrapper(sys.stdout), + style=color_style(), + registry=default_registry, + using: str = None, +): # noqa + """Manage the creation and deletion of indices.""" + choices = [ + CommandAction.CREATE.value, + CommandAction.DELETE.value, + CommandAction.REBUILD.value, + CommandAction.UPDATE.value, + ] + if action not in choices: + raise ValueError(f"Invalid action '{action}'. Valid actions are: {', '.join(choices)}") + + action = CommandAction(action) + known = registry.get_indices() + + # Filter indices + if indices: + # Ensure every given indices exists + known_name = [i._name for i in known] # noqa + unknown = set(indices) - set(known_name) + if unknown: + stderr.write(f"Unknown indices '{list(unknown)}', choices are: '{known_name}'") + raise CommandError + + # Only keep given indices + indices = list(filter(lambda i: i._name in indices, known)) # noqa + else: + indices = known + + # Display expected action + if verbosity or not force: + stdout.write(f"The following indices will be {action.past}:") + for index in indices: + stdout.write(f"\t- {index._name}.") # noqa + stdout.write("") + + # Ask for confirmation to continue + if not force: # pragma: no cover + while True: + p = input("Continue ? [y]es [n]o : ") + if p.lower() in ["yes", "y"]: + stdout.write("") + break + elif p.lower() in ["no", "n"]: + raise CommandError + + pp = action.present_participle.title() + for index in indices: + if verbosity: + stdout.write( + f"{pp} index '{index._name}'...\r", + ending="", + ) # noqa + stdout.flush() + try: + # If current index depends on many different models, add them to + # index._doc_types before indexing to make sure all mappings of different models + # are taken into account. + index_models = registry.get_indices_raw().get(index, None) + for model in list(index_models): + index._doc_types.append(model) + + if action == CommandAction.CREATE: + index.create(using=using) + elif action == CommandAction.DELETE: + index.delete(using=using) + elif action == CommandAction.UPDATE: + index.put_mapping(body=index.to_dict()["mappings"], using=using) + else: + try: + index.delete(using=using) + except opensearchpy.exceptions.NotFoundError: + pass + index.create(using=using) + except opensearchpy.exceptions.TransportError as e: + if verbosity or not ignore_error: + error = style.ERROR(f"Error: {e.error} - {e.info}") + stderr.write(f"{pp} index '{index._name}'...\n{error}") # noqa + if not ignore_error: + stderr.write("exiting...") + raise CommandError + else: + if verbosity: + stdout.write(f"{pp} index '{index._name}'... {style.SUCCESS('OK')}") # noqa From 6245d67ebe59b4a00163fc2a0c98341be631ccd7 Mon Sep 17 00:00:00 2001 From: Andrea <37450331+AndreaDelm@users.noreply.github.com> Date: Tue, 16 Dec 2025 17:37:13 +0100 Subject: [PATCH 2/2] pop db_alias before using Bulk of opensearchpy (#92) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andréa --- django_opensearch_dsl/documents.py | 1 + 1 file changed, 1 insertion(+) diff --git a/django_opensearch_dsl/documents.py b/django_opensearch_dsl/documents.py index 7929dd6..ab665df 100644 --- a/django_opensearch_dsl/documents.py +++ b/django_opensearch_dsl/documents.py @@ -206,6 +206,7 @@ def bulk( self, actions: Iterable[dict[str, Any]], using: str = None, **kwargs: Any ) -> Union[tuple[int, int], tuple[int, list]]: """Execute given actions in bulk.""" + kwargs.pop("alias", None) # removed to avoid error in opensearchpy client response = bulk(client=self._get_connection(using), actions=actions, **kwargs) # send post index signal post_index.send(sender=self.__class__, instance=self, actions=actions, response=response)