Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions bin/pre_commit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,6 @@ fi
echo ""


################################################################################
# MYPY #
################################################################################
echo -n "${Cyan}Running mypy... $Color_Off"
out=$(mypy django_opensearch_dsl --disallow-untyped-def)
if [ "$?" -ne 0 ] ; then
echo "${Red}Error !$Color_Off"
echo -e "$out"
EXIT_CODE=1
else
echo "${Green}Ok ✅ $Color_Off"
fi
echo ""


################################################################################
# BANDIT #
################################################################################
Expand Down
67 changes: 46 additions & 21 deletions django_opensearch_dsl/documents.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy
import io
import sys
import time
from collections import deque
Expand All @@ -6,7 +8,7 @@

import opensearchpy
from django.db import models
from django.db.models import Q, QuerySet
from django.db.models import Max, Min, Q, QuerySet
from opensearchpy.helpers import bulk, parallel_bulk
from opensearchpy.helpers.document import Document as DSLDocument

Expand Down Expand Up @@ -101,34 +103,56 @@ def get_indexing_queryset(
count: int = None,
action: CommandAction = CommandAction.INDEX,
stdout: TextIO = sys.stdout,
batch_size: int = None,
batch_type: str = "offset",
) -> Iterable:
"""Divide the queryset into chunks."""
chunk_size = self.django.queryset_pagination
chunk_size = batch_size or self.django.queryset_pagination
qs = self.get_queryset(filter_=filter_, exclude=exclude, count=count, alias=alias)
qs = qs.order_by("pk") if not qs.query.is_sliced else qs
total = qs.count()
count = qs.count()
model = self.django.model.__name__
action = action.present_participle.title()

i = 0
done = 0
start = time.time()
done = 0
if verbose:
stdout.write(f"{action} {model}: 0% ({self._eta(start, done, total)})\r")
while done < total:
if verbose:
stdout.write(f"{action} {model}: {round(i / total * 100)}% ({self._eta(start, done, total)})\r")

for obj in qs[i : i + chunk_size]:
done += 1
yield obj

i = min(i + chunk_size, total)

if verbose:
stdout.write(f"{action} {total} {model}: OK \n")

def init_prepare(self) -> list[tuple[str, fields.DODField, Callable[[models.Model], Any]]]:
stdout.write(f"{action} {model}: 0% ({self._eta(start, done, count)})\r")

if count == 0:
stdout.write(f"No {model} objects to {action.lower()}.\n")
return

if batch_type == "pk_filters":
pks = qs.aggregate(min=Min("pk"), max=Max("pk"))
total_batches = (pks["max"] - pks["min"]) // chunk_size
for batch_number, offset in enumerate(range(pks["min"], pks["max"] + 1, chunk_size), start=1):
batch_qs = list(qs.filter(pk__gte=offset, pk__lt=offset + chunk_size))
stdout.write(f"Processing batch {batch_number}/{total_batches}: \n")
for obj in batch_qs:
done += 1
if done % chunk_size == 0:
stdout.write(
f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r"
)
yield obj
if len(batch_qs) > 0:
stdout.write(f"Max primary key in the current {model} batch: {batch_qs[-1].pk}\n")
else:
total_batches = (count + chunk_size - 1) // chunk_size
for batch_number, offset in enumerate(range(0, count, chunk_size), start=1):
batch_qs = list(qs[offset : offset + chunk_size].all())
stdout.write(f"Processing batch {batch_number}/{total_batches}: \n")
for obj in batch_qs:
done += 1
if done % chunk_size == 0:
stdout.write(
f"{action} {model}: {round(done / count * 100)}% ({self._eta(start, done, count)})\r"
)
yield obj
if len(batch_qs) > 0:
stdout.write(f"Max primary key in the current {model} batch: {batch_qs[-1].pk}\n")

def init_prepare(self):
"""Initialise the data model preparers once here.

Extracts the preparers from the model and generate a list of callables
Expand Down Expand Up @@ -182,6 +206,7 @@ def bulk(
self, actions: Iterable[dict[str, Any]], using: str = None, **kwargs: Any
) -> Union[tuple[int, int], tuple[int, list]]:
"""Execute given actions in bulk."""
kwargs.pop("alias", None) # removed to avoid error in opensearchpy client
response = bulk(client=self._get_connection(using), actions=actions, **kwargs)
# send post index signal
post_index.send(sender=self.__class__, instance=self, actions=actions, response=response)
Expand Down
4 changes: 4 additions & 0 deletions django_opensearch_dsl/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ class IpField(DODField, fields.Ip):
"""Allow indexing of IPv4 and IPv6 addresses."""


class JoinField(DODField, fields.Join):
"""Allow indexing of Join fields (with parent/child relation)."""


class LongField(DODField, fields.Long):
"""Allow indexing of long.

Expand Down
Loading
Loading