Skip to content

Commit 6cb1833

Browse files
authored
Arrow IPC Streaming Cache Implementation (#35)
* Add arrow ipc streaming cache implementation and make it default * fix stream unit test to reflect last_synced_at bug fix * small build and stability fixes
1 parent 4831b79 commit 6cb1833

20 files changed

Lines changed: 1831 additions & 101 deletions

api/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ RUN apt-get update \
2222

2323
# Copy and install the transfer library wheel
2424
COPY --from=transfer-builder /transfer-build/dist/*.whl /api/dist/
25-
RUN pip install --no-cache-dir /api/dist/pontoon-*.whl
25+
RUN pip install --only-binary=:all: --no-cache-dir /api/dist/pontoon-*.whl
2626

2727
# Build the API app
2828
COPY ./api/pyproject.toml /api/pyproject.toml

api/app/models/transfer_run.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,11 @@ def get_latest_transfer_run(session, transfer_id:uuid.UUID, status:str = None) -
6666
if status != None:
6767
stmt = stmt.where(TransferRun.Model.status == status)
6868

69-
return session.exec(stmt).first()
69+
result = session.exec(stmt).all()
70+
if len(result) == 1:
71+
return result[0]
72+
else:
73+
return None
7074

7175

7276
@staticmethod

api/app/routers/internal.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import uuid
2+
from typing import Optional
23
from fastapi import HTTPException, Depends, Query, APIRouter
34

45
from app.dependencies import get_session
@@ -36,7 +37,7 @@ def read_destination(destination_id: uuid.UUID, session=Depends(get_session)):
3637
return get_destination_by_id(session, destination_id)
3738

3839

39-
@router.get("/runs/{transfer_id}", response_model=TransferRun.Model)
40+
@router.get("/runs/{transfer_id}", response_model=Optional[TransferRun.Model])
4041
def get_transfer_run(transfer_id: uuid.UUID, session=Depends(get_session)):
4142
return TransferRun.get_latest_transfer_run(session, transfer_id)
4243

@@ -62,7 +63,13 @@ def update_transfer_run(transfer_run_id:uuid.UUID, transfer_run:TransferRun.Upda
6263

6364
# Get the destination vendor type
6465
transfer_id = transfer_run.transfer_id
65-
destination_id = Transfer.get(session, transfer_id).destination_id
66+
transfer = Transfer.get(session, transfer_id)
67+
68+
# Ad-hoc transfer runs don't have a parent transfer
69+
if transfer is None:
70+
return transfer_run
71+
72+
destination_id = transfer.destination_id
6673
destination_vendor_type = Destination.get(session, destination_id).connection_info['vendor_type']
6774

6875
# Get the source vendor types from the models transferred

data-transfer/pontoon/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RUN apt-get update \
1313
COPY pyproject.toml ./pyproject.toml
1414

1515
# Install the pontoon package and its dependencies
16-
RUN pip install --no-cache-dir .
16+
RUN pip install --only-binary=:all: --no-cache-dir .
1717

1818
# Copy the pontoon source code
1919
COPY pontoon/ ./pontoon/

data-transfer/pontoon/pontoon/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from pontoon.base import Progress
2121
from pontoon.cache.memory_cache import MemoryCache
2222
from pontoon.cache.sqlite_cache import SqliteCache
23+
from pontoon.cache.arrow_ipc_cache import ArrowIpcCache
2324
from pontoon.base import Namespace, Stream, Record, Dataset, Cache, Mode, Source, Destination
2425
from pontoon.base import SourceConnectionFailed, SourceStreamDoesNotExist, SourceStreamInvalidSchema
2526
from pontoon.base import DestinationConnectionFailed, DestinationStreamInvalidSchema

data-transfer/pontoon/pontoon/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def with_batch_id(self, batch_id:str, field_name:str='pontoon__batch_id') -> 'St
136136

137137

138138
def with_last_synced_at(self, sync_dt:datetime, field_name:str='pontoon__last_synced_at') -> 'Stream':
139-
return self.with_field(field_name, pa.timestamp('us', tz='UTC'), sync_dt.isoformat())
139+
return self.with_field(field_name, pa.timestamp('us', tz='UTC'), sync_dt)
140140

141141

142142
def with_version(self, version:str, field_name='pontoon__version') -> 'Stream':

0 commit comments

Comments
 (0)