-
Notifications
You must be signed in to change notification settings - Fork 3
Bulk Upsert (note that the first part was accidentally already merged in main before) #187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
74233c1
ec63540
461f188
6a56e1c
7abe4d7
6435595
2593b40
bf1beff
66cf0c1
c4cc7ea
46d4f41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,18 @@ | ||
| x-app-env: &default-app-env | ||
| DJANGO_EMAIL_URL: ${DJANGO_EMAIL_URL:?} | ||
| DJANGO_SECURE_SSL_REDIRECT: ${DJANGO_SECURE_SSL_REDIRECT:-true} | ||
| DJANGO_SETTINGS_MODULE: radis.settings.production | ||
| DJANGO_STATIC_ROOT: /var/www/web/static/ | ||
| POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?} | ||
|
|
||
| x-app: &default-app | ||
| image: ghcr.io/openradx/radis:latest | ||
| volumes: | ||
| - web_data:/var/www/web | ||
| - ${SSL_SERVER_CERT_FILE:?}:/etc/web/ssl/cert.pem | ||
| - ${SSL_SERVER_KEY_FILE:?}:/etc/web/ssl/key.pem | ||
| environment: | ||
| DJANGO_EMAIL_URL: ${DJANGO_EMAIL_URL:?} | ||
| DJANGO_SECURE_SSL_REDIRECT: ${DJANGO_SECURE_SSL_REDIRECT:-true} | ||
| DJANGO_SETTINGS_MODULE: radis.settings.production | ||
| DJANGO_STATIC_ROOT: /var/www/web/static/ | ||
| POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?} | ||
| <<: *default-app-env | ||
|
|
||
| x-deploy: &deploy | ||
| replicas: 1 | ||
|
|
@@ -69,6 +72,15 @@ services: | |
|
|
||
| llm_worker: | ||
| <<: *default-app | ||
| volumes: | ||
| - web_data:/var/www/web | ||
| - ${SSL_SERVER_CERT_FILE:?}:/etc/web/ssl/cert.pem | ||
| - ${SSL_SERVER_KEY_FILE:?}:/etc/web/ssl/key.pem | ||
| - ${RADIS_LLM_CA_BUNDLE:-/etc/ssl/certs/ca-certificates.crt}:/etc/ssl/certs/radis-ca-bundle.pem:ro | ||
| environment: | ||
| <<: *default-app-env | ||
| SSL_CERT_FILE: /etc/ssl/certs/radis-ca-bundle.pem | ||
| REQUESTS_CA_BUNDLE: /etc/ssl/certs/radis-ca-bundle.pem | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does all the SSL / CA Bundle stuff has something to do with this PR? |
||
| command: > | ||
| bash -c " | ||
| wait-for-it -s postgres.local:5432 -t 60 && | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,9 @@ SSL_IP_ADDRESSES=127.0.0.1 | |
| SSL_SERVER_CERT_FILE="./cert.pem" | ||
| SSL_SERVER_KEY_FILE="./key.pem" | ||
| SSL_SERVER_CHAIN_FILE="./chain.pem" | ||
| # Optional: custom CA bundle for outbound HTTPS (e.g., private LLM endpoints). | ||
| # Defaults to host system CA bundle if not set. | ||
| # RADIS_LLM_CA_BUNDLE="/etc/ssl/certs/ca-certificates.crt" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this have something to do with this PR? |
||
|
|
||
| # The timezone used by the server. | ||
| TIME_ZONE="Europe/Berlin" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| import logging | ||
|
|
||
| from procrastinate.contrib.django import app | ||
| from procrastinate.types import JSONValue | ||
|
|
||
| from .utils.indexing import bulk_upsert_report_search_vectors | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @app.task | ||
| def bulk_index_reports(report_ids: list[int]) -> None: | ||
| if not report_ids: | ||
| return | ||
| logger.info("Indexing %s reports in bulk.", len(report_ids)) | ||
| bulk_upsert_report_search_vectors(report_ids) | ||
|
|
||
|
|
||
| def enqueue_bulk_index_reports(report_ids: list[int]) -> int | None: | ||
| if not report_ids: | ||
| return None | ||
| try: | ||
| payload: list[JSONValue] = [int(report_id) for report_id in report_ids] | ||
| except (TypeError, ValueError) as exc: | ||
| logger.error("Invalid report_id in bulk index request: %s", exc) | ||
| return None | ||
|
Comment on lines
+19
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don’t drop valid IDs when one entry is invalid. ✅ Suggested fix (skip invalid IDs, keep valid ones) def enqueue_bulk_index_reports(report_ids: list[int]) -> int | None:
if not report_ids:
return None
- try:
- payload: list[int] = [int(report_id) for report_id in report_ids]
- except (TypeError, ValueError) as exc:
- logger.error("Invalid report_id in bulk index request: %s", exc)
- return None
+ payload: list[int] = []
+ for report_id in report_ids:
+ try:
+ payload.append(int(report_id))
+ except (TypeError, ValueError):
+ logger.exception(
+ "Invalid report_id in bulk index request: %r",
+ report_id,
+ )
+ if not payload:
+ return None
return app.configure_task(
"radis.pgsearch.tasks.bulk_index_reports",
allow_unknown=False,
).defer(report_ids=payload)🧰 Tools🪛 Ruff (0.14.14)24-24: Use Replace with (TRY400) 🤖 Prompt for AI Agents |
||
| return app.configure_task( | ||
| "radis.pgsearch.tasks.bulk_index_reports", | ||
| allow_unknown=False, | ||
| ).defer(report_ids=payload) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| import pytest | ||
|
|
||
| from radis.pgsearch.models import ReportSearchVector | ||
| from radis.pgsearch.utils.indexing import bulk_upsert_report_search_vectors | ||
| from radis.reports.models import Language, Report | ||
|
|
||
|
|
||
| @pytest.mark.django_db | ||
| def test_bulk_index_matches_signal_vector() -> None: | ||
| language = Language.objects.create(code="en") | ||
| report = Report.objects.create( | ||
| document_id="DOC-INDEX", | ||
| pacs_aet="PACS", | ||
| pacs_name="PACS", | ||
| pacs_link="", | ||
| patient_id="P1", | ||
| patient_birth_date="1980-01-01", | ||
| patient_sex="M", | ||
| study_description="Study", | ||
| study_datetime="2024-01-01T00:00:00Z", | ||
| study_instance_uid="1.2.3.4", | ||
| accession_number="ACC1", | ||
| body="Findings: No acute abnormality.", | ||
| language=language, | ||
| ) | ||
|
|
||
| signal_vector = ReportSearchVector.objects.get(report=report).search_vector | ||
| ReportSearchVector.objects.filter(report=report).delete() | ||
|
|
||
| bulk_upsert_report_search_vectors([report.pk]) | ||
| bulk_vector = ReportSearchVector.objects.get(report=report).search_vector | ||
|
|
||
| assert signal_vector == bulk_vector |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from collections.abc import Iterable | ||
|
|
||
| from django.conf import settings | ||
| from django.db import connection | ||
|
|
||
| from radis.reports.models import Report | ||
|
|
||
| from ..models import ReportSearchVector | ||
| from .language_utils import code_to_language | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _chunked(items: list[int], size: int) -> Iterable[list[int]]: | ||
| for index in range(0, len(items), size): | ||
| yield items[index : index + size] | ||
|
|
||
|
|
||
| def bulk_upsert_report_search_vectors( | ||
| report_ids: Iterable[int], | ||
| chunk_size: int | None = None, | ||
| ) -> None: | ||
| ids = sorted({int(report_id) for report_id in report_ids if report_id is not None}) | ||
| if not ids: | ||
| return | ||
| resolved_chunk_size = ( | ||
| settings.PGSEARCH_BULK_INDEX_CHUNK_SIZE if chunk_size is None else chunk_size | ||
| ) | ||
|
|
||
| for chunk in _chunked(ids, resolved_chunk_size): | ||
| reports = ( | ||
| Report.objects.filter(id__in=chunk) | ||
| .select_related("language") | ||
| .only("id", "language__code") | ||
| ) | ||
| report_ids_found: set[int] = set() | ||
| config_to_ids: dict[str, list[int]] = {} | ||
| config_cache: dict[str, str] = {} | ||
| for report in reports: | ||
| report_ids_found.add(report.pk) | ||
| language_code = report.language.code | ||
| config = config_cache.get(language_code) | ||
| if config is None: | ||
| config = code_to_language(language_code) | ||
| config_cache[language_code] = config | ||
| config_to_ids.setdefault(config, []).append(report.pk) | ||
| missing_ids = set(chunk) - report_ids_found | ||
| if missing_ids: | ||
| logger.warning( | ||
| "Skipping %s missing reports during bulk index (ids=%s).", | ||
| len(missing_ids), | ||
| sorted(missing_ids)[:10], | ||
| ) | ||
|
|
||
| for config, config_ids in config_to_ids.items(): | ||
| ReportSearchVector.objects.bulk_create( | ||
| [ReportSearchVector(report_id=report_id) for report_id in config_ids], | ||
| ignore_conflicts=True, | ||
| batch_size=settings.PGSEARCH_BULK_INSERT_BATCH_SIZE, | ||
| ) | ||
|
|
||
| with connection.cursor() as cursor: | ||
| cursor.execute( | ||
| """ | ||
| UPDATE pgsearch_reportsearchvector v | ||
| SET search_vector = to_tsvector(%s::regconfig, r.body) | ||
| FROM reports_report r | ||
| WHERE v.report_id = r.id AND r.id = ANY(%s) | ||
| """, | ||
| [config, config_ids], | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was it necessary to have &default-app-env and &default-app? llm_worker already uses &default-app. And SSL and CA Bundle stuff has nothing to do with this PR, right?