Skip to content

Commit 38edbaf

Browse files
authored
Merge pull request #423 from alephdata/release/4.0.0
4.0.0 release
2 parents 7bf441d + 3850970 commit 38edbaf

18 files changed

+228
-65
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 3.22.0
2+
current_version = 4.0.0-rc28
33
tag_name = {new_version}
44
commit = True
55
tag = True

.github/workflows/build.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
- name: Set up
1616
run: |
1717
docker --version
18-
docker-compose --version
18+
docker compose --version
1919
echo "${GITHUB_REF}"
2020
- name: Start services
2121
run: |
@@ -25,6 +25,8 @@ jobs:
2525
make cached-build
2626
- name: Install development dependencies
2727
run: make dev
28+
env:
29+
PIP_BREAK_SYSTEM_PACKAGES: 1
2830
- name: Check formatting
2931
run: make format-check
3032
- name: Run linter (ruff)

.github/workflows/daily.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- name: Set up
1414
run: |
1515
docker --version
16-
docker-compose --version
16+
docker compose --version
1717
- name: Build docker cache
1818
run: |
1919
make fresh-cache

.github/workflows/main.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
name: Auto Assign to Project
2+
3+
on:
4+
issues:
5+
types: [opened, labeled]
6+
pull_request_target:
7+
types: [opened, labeled]
8+
9+
jobs:
10+
assign_one_project:
11+
runs-on: ubuntu-latest
12+
name: Assign to One Project
13+
steps:
14+
- uses: actions/[email protected]
15+
with:
16+
# You can target a repository in a different organization
17+
# to the issue
18+
project-url: https://github.com/orgs/alephdata/projects/10
19+
github-token: ${{ secrets.ALEPH_GITHUB_TOKEN }}

Dockerfile

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ LABEL org.opencontainers.image.source https://github.com/alephdata/ingest-file
99
# RUN echo "deb http://http.us.debian.org/debian stretch non-free" >/etc/apt/sources.list.d/nonfree.list
1010
RUN apt-get -qq -y update \
1111
&& apt-get -qq -y install build-essential locales ca-certificates \
12+
# git
13+
git \
1214
# python deps (mostly to install their dependencies)
1315
python3-pip python3-dev python3-pil \
1416
# tesseract
@@ -121,8 +123,6 @@ RUN groupadd -g 1000 -r app \
121123
RUN mkdir /models/ && \
122124
curl -o "/models/model_type_prediction.ftz" "https://public.data.occrp.org/develop/models/types/type-08012020-7a69d1b.ftz"
123125

124-
# Having updated pip/setuptools seems to break the test run for some reason (12/01/2022)
125-
# RUN pip3 install --no-cache-dir -U pip setuptools
126126
COPY requirements.txt /tmp/
127127
RUN pip3 install --no-cache-dir --prefer-binary --upgrade pip
128128
RUN pip3 install --no-cache-dir --prefer-binary --upgrade setuptools wheel
@@ -155,7 +155,8 @@ ENV ARCHIVE_TYPE=file \
155155
ARCHIVE_PATH=/data \
156156
FTM_STORE_URI=postgresql://aleph:aleph@postgres/aleph \
157157
REDIS_URL=redis://redis:6379/0 \
158-
TESSDATA_PREFIX=/usr/share/tesseract-ocr/4.00/tessdata
158+
TESSDATA_PREFIX=/usr/share/tesseract-ocr/4.00/tessdata \
159+
LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1
159160

160161
# USER app
161162
CMD ingestors process

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
INGEST=ghcr.io/alephdata/ingest-file
2-
COMPOSE=docker-compose
2+
COMPOSE=docker compose
33
DOCKER=$(COMPOSE) run --rm ingest-file
44

55
.PHONY: build

docker-compose.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ services:
1212
image: redis:alpine
1313
command: ["redis-server", "--save", "3600", "10"]
1414

15+
rabbitmq:
16+
image: rabbitmq:3.9-management-alpine
17+
1518
ingest-file:
1619
build:
1720
context: .
@@ -22,7 +25,7 @@ services:
2225
- /data:mode=777
2326
environment:
2427
FTM_STORE_URI: postgresql://ingest:ingest@postgres/ingest
25-
LOG_FORMAT: TEXT # TEXT or JSON
28+
LOG_FORMAT: TEXT # TEXT or JSON
2629
volumes:
2730
- "./ingestors:/ingestors/ingestors"
2831
- "./tests:/ingestors/tests"
@@ -33,3 +36,4 @@ services:
3336
depends_on:
3437
- postgres
3538
- redis
39+
- rabbitmq

ingestors/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
import logging
44

5-
__version__ = "3.22.0"
5+
__version__ = "4.0.0-rc28"
66

77
logging.getLogger("chardet").setLevel(logging.INFO)
88
logging.getLogger("PIL").setLevel(logging.INFO)
99
logging.getLogger("google.auth").setLevel(logging.INFO)
1010
logging.getLogger("urllib3").setLevel(logging.WARNING)
1111
logging.getLogger("msglite").setLevel(logging.WARNING)
12+
logging.getLogger("pika").setLevel(logging.WARNING)

ingestors/cli.py

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
import sys
22
import click
33
import logging
4+
import uuid
45
from pprint import pprint
6+
from random import randrange
7+
58
from ftmstore import get_dataset
6-
from servicelayer.cache import get_redis, get_fakeredis
9+
from servicelayer.cache import get_redis
710
from servicelayer.logs import configure_logging
8-
from servicelayer.jobs import Job, Dataset
11+
from servicelayer.taskqueue import Dataset, Task
912
from servicelayer import settings as sl_settings
1013
from servicelayer.archive.util import ensure_path
14+
from servicelayer import settings as sls
1115
from servicelayer.tags import Tags
1216

1317
from ingestors import settings
1418
from ingestors.manager import Manager
1519
from ingestors.directory import DirectoryIngestor
1620
from ingestors.analysis import Analyzer
17-
from ingestors.worker import IngestWorker, OP_ANALYZE, OP_INGEST
21+
from ingestors.worker import get_worker
1822

1923
log = logging.getLogger(__name__)
20-
STAGES = [OP_ANALYZE, OP_INGEST]
2124

2225

2326
@click.group()
@@ -30,7 +33,7 @@ def cli():
3033
def process(sync):
3134
"""Start the queue and process tasks as they come. Blocks while waiting"""
3235
num_threads = None if sync else sl_settings.WORKER_THREADS
33-
worker = IngestWorker(stages=STAGES, num_threads=num_threads)
36+
worker = get_worker(num_threads=num_threads)
3437
code = worker.run()
3538
sys.exit(code)
3639

@@ -50,11 +53,22 @@ def killthekitten():
5053
conn.flushall()
5154

5255

53-
def _ingest_path(db, conn, dataset, path, languages=[]):
56+
def _ingest_path(db, dataset, path, languages=[]):
5457
context = {"languages": languages}
55-
job = Job.create(conn, dataset)
56-
stage = job.get_stage(OP_INGEST)
57-
manager = Manager(db, stage, context)
58+
59+
priority = priority = randrange(1, sls.RABBITMQ_MAX_PRIORITY + 1)
60+
61+
task = Task(
62+
task_id=uuid.uuid4().hex,
63+
job_id=uuid.uuid4().hex,
64+
collection_id=dataset,
65+
delivery_tag="",
66+
operation=settings.STAGE_INGEST,
67+
priority=priority,
68+
context=context,
69+
payload={},
70+
)
71+
manager = Manager(db, task)
5872
path = ensure_path(path)
5973
if path is not None:
6074
if path.is_file():
@@ -76,15 +90,14 @@ def _ingest_path(db, conn, dataset, path, languages=[]):
7690
@click.argument("path", type=click.Path(exists=True))
7791
def ingest(path, dataset, languages=None):
7892
"""Queue a set of files for ingest."""
79-
conn = get_redis()
80-
db = get_dataset(dataset, OP_INGEST)
81-
_ingest_path(db, conn, dataset, path, languages=languages)
93+
db = get_dataset(dataset, settings.STAGE_INGEST)
94+
_ingest_path(db, dataset, path, languages=languages)
8295

8396

8497
@cli.command()
8598
@click.option("--dataset", required=True, help="Name of the dataset")
8699
def analyze(dataset):
87-
db = get_dataset(dataset, OP_ANALYZE)
100+
db = get_dataset(dataset, settings.STAGE_ANALYZE)
88101
analyzer = None
89102
for entity in db.partials():
90103
if analyzer is None or analyzer.entity.id != entity.id:
@@ -102,13 +115,20 @@ def analyze(dataset):
102115
@click.argument("path", type=click.Path(exists=True))
103116
def debug(path, languages=None):
104117
"""Debug the ingest for the given path."""
105-
conn = get_fakeredis()
106118
settings.fts.DATABASE_URI = "sqlite:////tmp/debug.sqlite3"
107-
db = get_dataset("debug", origin=OP_INGEST, database_uri=settings.fts.DATABASE_URI)
119+
120+
# collection ID that is meant for testing purposes only
121+
debug_datatset_id = 100
122+
123+
db = get_dataset(
124+
debug_datatset_id,
125+
origin=settings.STAGE_INGEST,
126+
database_uri=settings.fts.DATABASE_URI,
127+
)
108128
db.delete()
109-
_ingest_path(db, conn, "debug", path, languages=languages)
110-
worker = IngestWorker(conn=conn, stages=STAGES)
111-
worker.sync()
129+
_ingest_path(db, debug_datatset_id, path, languages=languages)
130+
worker = get_worker()
131+
worker.process(blocking=False)
112132
for entity in db.iterate():
113133
pprint(entity.to_dict())
114134

ingestors/documents/pdf.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,33 @@ class PDFIngestor(Ingestor, PDFSupport):
1919
EXTENSIONS = ["pdf"]
2020
SCORE = 6
2121

22+
def extract_xmp_metadata(self, pdf, entity):
23+
try:
24+
xmp = pdf.xmp_metadata
25+
if xmp is None:
26+
return
27+
entity.add("messageId", xmp["xmpmm"].get("documentid"))
28+
entity.add("title", xmp["dc"].get("title"))
29+
entity.add("generator", xmp["pdf"].get("producer"))
30+
entity.add("language", xmp["dc"].get("language"))
31+
entity.add("authoredAt", xmp["xmp"].get("createdate"))
32+
entity.add("modifiedAt", xmp["xmp"].get("modifydate"))
33+
except Exception as ex:
34+
log.warning("Error reading XMP: %r", ex)
35+
36+
def extract_metadata(self, pdf, entity):
37+
meta = pdf.metadata
38+
if meta is not None:
39+
entity.add("title", meta.get("title"))
40+
entity.add("author", meta.get("author"))
41+
entity.add("generator", meta.get("creator"))
42+
entity.add("generator", meta.get("producer"))
43+
entity.add("keywords", meta.get("subject"))
44+
if "creationdate" in meta:
45+
entity.add("authoredAt", meta.get("creationdate"))
46+
if "moddate" in meta:
47+
entity.add("modifiedAt", meta.get("moddate"))
48+
2249
def ingest(self, file_path, entity):
2350
"""Ingestor implementation."""
2451
try:

ingestors/manager.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
from servicelayer.archive.util import ensure_path
1515
from servicelayer.extensions import get_extensions
1616
from sentry_sdk import capture_exception
17+
from servicelayer.cache import get_redis
18+
from servicelayer.taskqueue import queue_task, get_rabbitmq_channel
1719
from followthemoney.helpers import entity_filename
1820
from followthemoney.namespace import Namespace
1921
from prometheus_client import Counter, Histogram
@@ -75,11 +77,13 @@ class Manager(object):
7577

7678
MAGIC = magic.Magic(mime=True)
7779

78-
def __init__(self, dataset, stage, context):
80+
def __init__(self, dataset, root_task):
81+
self.conn = get_redis()
7982
self.dataset = dataset
8083
self.writer = dataset.bulk()
81-
self.stage = stage
82-
self.context = context
84+
self.root_task = root_task
85+
self.collection_id = root_task.collection_id
86+
self.context = root_task.context
8387
self.ns = Namespace(self.context.get("namespace"))
8488
self.work_path = ensure_path(mkdtemp(prefix="ingestor-"))
8589
self.emitted = set()
@@ -92,7 +96,7 @@ def archive(self):
9296

9397
def make_entity(self, schema, parent=None):
9498
schema = model.get(schema)
95-
entity = model.make_entity(schema, key_prefix=self.stage.job.dataset.name)
99+
entity = model.make_entity(schema, key_prefix=self.collection_id)
96100
self.make_child(parent, entity)
97101
return entity
98102

@@ -150,7 +154,15 @@ def auction(self, file_path, entity):
150154

151155
def queue_entity(self, entity):
152156
log.debug("Queue: %r", entity)
153-
self.stage.queue(entity.to_dict(), self.context)
157+
queue_task(
158+
get_rabbitmq_channel(),
159+
get_redis(),
160+
self.collection_id,
161+
settings.STAGE_INGEST,
162+
self.root_task.job_id,
163+
self.context,
164+
**entity.to_dict(),
165+
)
154166

155167
def store(self, file_path, mime_type=None):
156168
file_path = ensure_path(file_path)

ingestors/settings.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@
5151
# Also store cached values in the SQL database
5252
sls.TAGS_DATABASE_URI = fts.DATABASE_URI
5353

54+
RABBITMQ_URL = env.get("ALEPH_RABBITMQ_URL", "rabbitmq")
55+
# Prefetch count values
56+
# This is the number of tasks the IngestWorker will grab at any given time
57+
RABBITMQ_QOS_INGEST_QUEUE = 1
58+
RABBITMQ_QOS_ANALYZE_QUEUE = 1
59+
60+
STAGE_INGEST = "ingest"
61+
STAGE_ANALYZE = "analyze"
62+
5463
# ProcessingException is thrown whenever something goes wrong wiht
5564
# parsing a file. Enable this with care, it can easily eat up the
5665
# Sentry quota of events.

ingestors/support/cache.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ def cache_key(self, *parts):
1919
return make_key(*parts)
2020

2121
def get_cache_set(self, key):
22-
return ensure_list(self.manager.stage.conn.smembers(key))
22+
return ensure_list(self.manager.conn.smembers(key))
2323

2424
def add_cache_set(self, key, value):
25-
self.manager.stage.conn.sadd(key, value)
26-
self.manager.stage.conn.expire(key, REDIS_LONG)
25+
self.manager.conn.sadd(key, value)
26+
self.manager.conn.expire(key, REDIS_LONG)

ingestors/support/email.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def parse_references(self, references, in_reply_to):
139139

140140
def resolve_message_ids(self, entity):
141141
# https://cr.yp.to/immhf/thread.html
142-
ctx = self.manager.stage.job.dataset.name
142+
ctx = self.manager.collection_id
143143

144144
for message_id in entity.get("messageId"):
145145
key = self.cache_key("mid-ent", ctx, message_id)

0 commit comments

Comments
 (0)