|
| 1 | +import os |
| 2 | +import mimetypes |
| 3 | +from pathlib import Path |
| 4 | +from tempfile import NamedTemporaryFile |
| 5 | +from urllib.parse import urlparse |
| 6 | + |
| 7 | +import boto3 |
| 8 | +import pytest |
| 9 | +from sqlmodel import Session, select |
| 10 | +from moto import mock_aws |
| 11 | + |
| 12 | +from app.core.cloud import AmazonCloudStorageClient |
| 13 | +from app.core.config import settings |
| 14 | +from app.models import Document |
| 15 | +from app.tests.utils.document import ( |
| 16 | + Route, |
| 17 | + WebCrawler, |
| 18 | + crawler, |
| 19 | +) |
| 20 | + |
| 21 | +def upload(route: Route, scratch: Path, crawler: WebCrawler): |
| 22 | + (mtype, _) = mimetypes.guess_type(str(scratch)) |
| 23 | + with scratch.open('rb') as fp: |
| 24 | + return crawler.client.post( |
| 25 | + str(route), |
| 26 | + headers=crawler.superuser_token_headers, |
| 27 | + files={ |
| 28 | + 'src': (str(scratch), fp, mtype), |
| 29 | + }, |
| 30 | + ) |
| 31 | + |
| 32 | +@pytest.fixture |
| 33 | +def scratch(): |
| 34 | + with NamedTemporaryFile(mode='w', suffix='.txt') as fp: |
| 35 | + print('Hello World', file=fp, flush=True) |
| 36 | + yield Path(fp.name) |
| 37 | + |
| 38 | +@pytest.fixture |
| 39 | +def route(): |
| 40 | + return Route('cp') |
| 41 | + |
| 42 | +@pytest.fixture(scope='class') |
| 43 | +def aws_setup(): |
| 44 | + os.environ['AWS_ACCESS_KEY_ID'] = 'testing' |
| 45 | + os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing' |
| 46 | + os.environ['AWS_SECURITY_TOKEN'] = 'testing' |
| 47 | + os.environ['AWS_SESSION_TOKEN'] = 'testing' |
| 48 | + os.environ['AWS_DEFAULT_REGION'] = settings.AWS_DEFAULT_REGION |
| 49 | + |
| 50 | +@mock_aws |
| 51 | +@pytest.mark.usefixtures('aws_setup') |
| 52 | +class TestDocumentRouteUpload: |
| 53 | + def test_adds_to_database( |
| 54 | + self, |
| 55 | + db: Session, |
| 56 | + route: Route, |
| 57 | + scratch: Path, |
| 58 | + crawler: WebCrawler, |
| 59 | + ): |
| 60 | + aws = AmazonCloudStorageClient() |
| 61 | + aws.create() |
| 62 | + |
| 63 | + response = upload(route, scratch, crawler) |
| 64 | + doc_id = (response |
| 65 | + .json() |
| 66 | + .get('id')) |
| 67 | + statement = ( |
| 68 | + select(Document) |
| 69 | + .where(Document.id == doc_id) |
| 70 | + ) |
| 71 | + result = db.exec(statement).one() |
| 72 | + |
| 73 | + assert result.fname == str(scratch) |
| 74 | + |
| 75 | + def test_adds_to_S3( |
| 76 | + self, |
| 77 | + route: Route, |
| 78 | + scratch: Path, |
| 79 | + crawler: Route, |
| 80 | + ): |
| 81 | + aws = AmazonCloudStorageClient() |
| 82 | + aws.create() |
| 83 | + |
| 84 | + response = upload(route, scratch, crawler) |
| 85 | + url = urlparse(response.json().get('object_store_url')) |
| 86 | + key = Path(url.path) |
| 87 | + key = key.relative_to(key.root) |
| 88 | + |
| 89 | + client = boto3.client('s3') |
| 90 | + assert client.head_object(Bucket=url.netloc, Key=str(key)) |
0 commit comments