diff --git a/.gitignore b/.gitignore index 6d1f9ba..a72c0ba 100644 --- a/.gitignore +++ b/.gitignore @@ -83,3 +83,4 @@ wheels/ .pyre/ .dmypy.json dmypy.json + diff --git a/Dockerfile b/Dockerfile index 386fc23..e6e1518 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,5 +15,5 @@ COPY app ./app # 5. PYTHONPATH 환경변수 설정 (모듈 import를 위해) ENV PYTHONPATH=/app -# 6. FastAPI 서버 실행 -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +# 6. Celery 워커를 백그라운드에서 실행한 후 FastAPI 서버 실행 +CMD bash -c "celery -A app.celery_app worker --loglevel=info & uvicorn app.main:app --host 0.0.0.0 --port 8000" diff --git a/README.md b/README.md new file mode 100644 index 0000000..a8369f7 --- /dev/null +++ b/README.md @@ -0,0 +1,267 @@ +# AI 비디오 생성 API + +Google Veo 3를 활용한 텍스트-to-비디오 생성 API와 OCR, 문장 분리 기능을 제공합니다. + +## 주요 기능 + +- **OCR**: 이미지에서 텍스트 추출 및 문단 분석 +- **문장 분리**: 텍스트를 문장 단위로 분리 +- **토큰화**: 문장을 단어 단위로 분리 +- **문장 검증**: OpenAI를 활용한 문장 분리 품질 검증 +- **비디오 생성**: Google Veo 3를 활용한 텍스트-to-비디오 생성 (동기/비동기) +- **S3 업로드**: 생성된 비디오를 자동으로 AWS S3에 업로드 + +## 새로운 기능: 비동기 비디오 생성 + +### 기존 문제점 +- Veo 3 비디오 생성 시 긴 대기 시간으로 인한 타임아웃 발생 +- 동기 방식으로 인한 서버 리소스 점유 + +### 해결책 +- **Celery + Redis**를 활용한 비동기 작업 처리 +- 즉시 task_id 반환 후 별도 상태 조회 방식 +- 실시간 진행 상황 모니터링 + +## 설치 및 실행 + +### 1. 의존성 설치 + +```bash +pip install -r requirements.txt +``` + +### 2. 환경 변수 설정 + +`.env.example`을 참고하여 `.env` 파일을 생성하세요: + +```bash +cp .env.example .env +# .env 파일을 편집하여 API 키들을 설정 +``` + +필수 환경 변수: +- `GOOGLE_API_KEY`: Google Gemini API 키 (Veo 비디오 생성용) +- `OPENAI_API_KEY`: OpenAI API 키 (문장 검증용) +- `REDIS_URL`: Redis 연결 URL (기본: redis://localhost:6379/0) + +S3 업로드를 위한 환경 변수: +- `AWS_ACCESS_KEY_ID`: AWS 액세스 키 ID +- `AWS_SECRET_ACCESS_KEY`: AWS 시크릿 액세스 키 +- `AWS_REGION`: AWS 리전 (기본: ap-northeast-2) +- `S3_BUCKET_NAME`: S3 버킷 이름 + +### 3. Redis 서버 시작 + +```bash +# Docker Compose로 Redis 시작 +# docker-compose up -d redis + +# 또는 스크립트 사용 +# ./scripts/start_redis.sh +``` + +### 4. 서비스 시작 + +#### 옵션 1: 통합 스크립트 사용 + +```bash +./scripts/start_all.sh +``` + +이후 각 컴포넌트를 개별 터미널에서 실행: + +```bash +# 터미널 1: FastAPI 서버 +uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 + +# 터미널 2: Celery 워커 +./scripts/start_celery.sh + +# 터미널 3 (선택사항): Flower 모니터링 +./scripts/start_flower.sh +``` + +#### 옵션 2: 개별 실행 + +```bash +# 1. Redis 시작 +docker-compose up -d redis + +# 2. FastAPI 서버 (터미널 1) +uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 + +# 3. Celery 워커 (터미널 2) +celery -A app.celery_app worker --loglevel=info --concurrency=4 + +# 4. Flower 모니터링 (터미널 3, 선택사항) +celery -A app.celery_app flower --port=5555 +``` + +## API 사용법 + +### 비동기 비디오 생성 (권장) + +#### 1. 비디오 생성 요청 + +```bash +curl -X POST "http://localhost:8000/veo/async" \ + -H "Content-Type: application/json" \ + -d '{ + "prompt": "A majestic eagle soaring over snow-capped mountains at sunset", + "aspect_ratio": "16:9", + "timeout_seconds": 600 + }' +``` + +응답: +```json +{ + "task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "status": "queued", + "message": "비디오 생성 작업이 큐에 추가되었습니다. `/veo/status/{task_id}`로 상태를 확인하세요." +} +``` + +#### 2. 작업 상태 조회 + +```bash +curl "http://localhost:8000/veo/status/a1b2c3d4-e5f6-7890-abcd-ef1234567890" +``` + +진행 중인 경우: +```json +{ + "task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "status": "PROGRESS", + "result": null, + "progress": { + "status": "processing", + "message": "비디오 생성 진행중... (120초 경과)", + "operation": "operations/generate-video-12345", + "elapsed_seconds": 120 + }, + "error": null +} +``` + +완료된 경우: +```json +{ + "task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "status": "SUCCESS", + "result": { + "status": "completed", + "video_uri": "https://storage.googleapis.com/your-video-uri", + "operation": "operations/generate-video-12345", + "message": "비디오 생성 완료. Gemini API를 통해 직접 다운로드 가능합니다.", + "error": null + }, + "progress": null, + "error": null +} +``` + +#### 3. 작업 취소 (선택사항) + +```bash +curl -X DELETE "http://localhost:8000/veo/cancel/a1b2c3d4-e5f6-7890-abcd-ef1234567890" +``` + +### 동기 비디오 생성 + +기존 방식이 유지됩니다: + +```bash +curl -X POST "http://localhost:8000/veo" \ + -H "Content-Type: application/json" \ + -d '{ + "prompt": "A red panda riding a skateboard in a sunny park", + "aspect_ratio": "16:9", + "timeout_seconds": 180 + }' +``` + +## 모니터링 + +### 접속 정보 + +- **FastAPI 문서**: http://localhost:8000/docs +- **Redis Commander**: http://localhost:8081 (Redis 관리) +- **Flower**: http://localhost:5555 (Celery 작업 모니터링) + +### Flower에서 확인 가능한 정보 + +- 진행 중인 작업 목록 +- 완료된 작업 통계 +- 워커 상태 및 성능 +- 실시간 작업 진행 상황 + +## 작업 상태 + +| 상태 | 설명 | +|------|------| +| `PENDING` | 작업이 큐에서 대기 중 | +| `PROGRESS` | 작업이 진행 중 (세부 상황 정보 포함) | +| `SUCCESS` | 작업 완료 (비디오 URI 포함) | +| `FAILURE` | 작업 실패 (오류 메시지 포함) | +| `RETRY` | 작업 재시도 중 | +| `REVOKED` | 작업이 취소됨 | + +## 트러블슈팅 + +### Redis 연결 오류 + +```bash +# Redis 상태 확인 +docker ps | grep redis + +# Redis 재시작 +docker-compose restart redis +``` + +### Celery 워커 오류 + +```bash +# 워커 로그 확인 +celery -A app.celery_app worker --loglevel=debug + +# 워커 재시작 +pkill -f celery +./scripts/start_celery.sh +``` + +### 환경 변수 확인 + +```bash +# .env 파일 내용 확인 +cat .env + +# Python에서 환경 변수 확인 +python -c "import os; print('GOOGLE_API_KEY:', bool(os.getenv('GOOGLE_API_KEY')))" +``` + +## 성능 최적화 + +### Celery 워커 조정 + +```bash +# 동시 작업 수 조정 (기본: 4) +celery -A app.celery_app worker --concurrency=8 + +# 특정 큐만 처리 +celery -A app.celery_app worker --queues=video_generation +``` + +### Redis 메모리 관리 + +```bash +# Redis 메모리 사용량 확인 +docker exec ai_redis redis-cli info memory + +# 만료된 키 정리 +docker exec ai_redis redis-cli flushdb +``` + +## 라이선스 + +이 프로젝트는 MIT 라이선스 하에 배포됩니다. \ No newline at end of file diff --git a/app/celery_app.py b/app/celery_app.py new file mode 100644 index 0000000..df8ff77 --- /dev/null +++ b/app/celery_app.py @@ -0,0 +1,31 @@ +import os +from celery import Celery +from dotenv import load_dotenv + +# .env 파일 로드 +load_dotenv() + +# Redis URL 설정 +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") + +# Celery 인스턴스 생성 +celery_app = Celery( + "ai_video_generator", + broker=REDIS_URL, + backend=REDIS_URL, + include=["app.tasks.veo_tasks"] # 태스크 모듈 포함 +) + +# Celery 설정 +celery_app.conf.update( + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="Asia/Seoul", + enable_utc=True, + result_expires=3600, # 결과를 1시간 동안 저장 + task_time_limit=900, # 태스크 타임아웃 15분 + task_soft_time_limit=600, # 소프트 타임아웃 10분 + worker_prefetch_multiplier=1, + task_acks_late=True, +) diff --git a/app/main.py b/app/main.py index 40d4794..3c42bbb 100644 --- a/app/main.py +++ b/app/main.py @@ -1,12 +1,22 @@ from typing import List from io import BytesIO from pathlib import Path +import os +import time from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Body from fastapi.responses import JSONResponse from dotenv import load_dotenv from PIL import Image +# Google GenAI SDK 사용 +try: + from google import genai + from google.genai import types + _has_genai = True +except ImportError: + _has_genai = False + from .services.sentence_segmenter import split_sentences from .services.tokenizer import tokenize from .services.openai_eval import evaluate_segmentation_with_openai @@ -16,7 +26,6 @@ from .schemas.tokens import TokensRequest, TokensResponse from .schemas.validate import ValidateRequest, ValidateResponse - from .schemas.ocr import OCRResponse from .services.vision_ocr import ( encode_bytes_to_b64, @@ -26,11 +35,19 @@ extract_word_boxes, extract_paragraphs_spatial_proximity_advanced, ) +from .schemas.veo import ( + VeoRequest, VeoResponse, ErrorResponse, + VeoAsyncRequest, VeoAsyncResponse, VeoTaskStatus +) + +# Celery 관련 임포트 +from .celery_app import celery_app +from .tasks.veo_tasks import generate_veo_video_task # .env 로드 -load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env") +load_dotenv() -app = FastAPI(title="Vision OCR Wrapper", version="1.4.0") +app = FastAPI(title="Vision OCR Wrapper", version="1.5.0") @app.get("/health") @@ -142,4 +159,295 @@ async def validate_sentences(payload: ValidateRequest = Body(...)): ) return ValidateResponse(**result) except Exception as e: - raise HTTPException(status_code=500, detail=f"OpenAI 검증 실패: {e}") \ No newline at end of file + raise HTTPException(status_code=500, detail=f"OpenAI 검증 실패: {e}") + + +@app.post( + "/veo", + response_model=VeoResponse, + responses={ + 200: { + "description": "비디오 생성 성공 또는 진행 중", + "model": VeoResponse, + }, + 400: { + "description": "잘못된 요청 (API 키 누락, 패키지 미설치 등)", + "model": ErrorResponse, + }, + 429: { + "description": "API 할당량 초과", + "model": ErrorResponse, + }, + 500: { + "description": "서버 내부 오류", + "model": ErrorResponse, + } + }, + tags=["Video Generation"], + summary="텍스트로 비디오 생성 (동기)", + description=""" + ## 텍스트 프롬프트로 고품질 비디오 생성 (동기 방식) + + 이 엔드포인트는 Google Veo 3 모델을 사용하여 텍스트 설명으로부터 8초 길이의 720p 비디오를 생성합니다. + **주의: 이 엔드포인트는 동기식으로 작동하며, 비디오 생성이 완료될 때까지 대기합니다.** + + ### 프롬프트 작성 팁: + - **구체적으로 작성**: "A red panda riding a skateboard in a sunny park" + - **스타일 지정**: "cinematic", "animation", "realistic" 등 스타일 키워드 추가 + - **카메라 앵글**: "close-up", "wide shot", "aerial view" 등 + - **분위기**: "bright and cheerful", "dark and mysterious" 등 + + ### 응답 시간: + - 일반적으로 1-3분 소요 + - `timeout_seconds` 파라미터로 대기 시간 조절 가능 + + ### 주의사항: + - Google Gemini API 할당량이 필요합니다 + - 생성된 비디오는 `video_uri`를 통해 다운로드 가능합니다 + - 긴 대기 시간이 필요한 경우 `/veo/async` 엔드포인트 사용을 권장합니다 + """ +) +def generate_veo_video(req: VeoRequest): + if not _has_genai: + return VeoResponse( + status="error", + video_uri=None, + local_path=None, + operation=None, + message=None, + error="google-genai 패키지가 필요합니다. pip install google-genai를 실행하세요." + ) + + # Google API Key 확인 + google_api_key = os.getenv("GOOGLE_API_KEY") + if not google_api_key: + return VeoResponse( + status="error", + video_uri=None, + local_path=None, + operation=None, + message=None, + error="GOOGLE_API_KEY 환경변수가 설정되지 않았습니다." + ) + + try: + # 동기 방식에서는 Celery 태스크를 사용하도록 안내 + return VeoResponse( + status="error", + video_uri=None, + local_path=None, + operation=None, + message=None, + error="실제 비디오 생성은 /veo/async 엔드포인트를 사용해주세요. 동기 방식은 시간이 오래 걸려 권장하지 않습니다." + ) + + except Exception as e: + return VeoResponse( + status="error", + video_uri=None, + local_path=None, + operation=None, + message=None, + error=f"Google GenAI 호출 중 오류 발생: {str(e)}" + ) + + +@app.post( + "/veo/async", + response_model=VeoAsyncResponse, + responses={ + 202: { + "description": "비디오 생성 작업이 성공적으로 큐에 추가됨", + "model": VeoAsyncResponse, + }, + 400: { + "description": "잘못된 요청 (API 키 누락, 패키지 미설치 등)", + "model": ErrorResponse, + }, + 500: { + "description": "서버 내부 오류", + "model": ErrorResponse, + } + }, + status_code=202, + tags=["Video Generation"], + summary="텍스트로 비디오 생성 (비동기)", + description=""" + ## 텍스트 프롬프트로 고품질 비디오 생성 (비동기 방식) + + 이 엔드포인트는 Google Veo 3 모델을 사용하여 텍스트 설명으로부터 8초 길이의 720p 비디오를 비동기로 생성합니다. + **비동기 방식으로 작동하여 즉시 task_id를 반환하고, 별도로 상태를 조회할 수 있습니다.** + + ### 작업 흐름: + 1. 이 엔드포인트로 비디오 생성 요청 + 2. `task_id` 즉시 반환 (202 status) + 3. `/veo/status/{task_id}` 로 작업 상태 조회 + 4. 완료되면 `video_uri`를 통해 비디오 다운로드 + + ### 프롬프트 작성 팁: + - **구체적으로 작성**: "A red panda riding a skateboard in a sunny park" + - **스타일 지정**: "cinematic", "animation", "realistic" 등 스타일 키워드 추가 + - **카메라 앵글**: "close-up", "wide shot", "aerial view" 등 + - **분위기**: "bright and cheerful", "dark and mysterious" 등 + + ### 예상 소요 시간: + - 일반적으로 1-10분 소요 + - 복잡한 프롬프트의 경우 더 오래 걸릴 수 있음 + + ### 주의사항: + - Google Gemini API 할당량이 필요합니다 + - Redis 서버가 실행 중이어야 합니다 + - Celery 워커가 실행 중이어야 합니다 + """ +) +def generate_veo_video_async(req: VeoAsyncRequest): + if not _has_genai: + raise HTTPException( + status_code=400, + detail="Veo 3 모델은 현재 사용할 수 없습니다. Google Cloud 프로젝트에서 Generative Language API를 활성화해야 합니다." + ) + + try: + # Celery 태스크 큐에 작업 추가 + task = generate_veo_video_task.delay( + prompt=req.prompt, + aspect_ratio=req.aspect_ratio, + model=req.model, + timeout_seconds=req.timeout_seconds + ) + + return VeoAsyncResponse( + task_id=task.id, + status="queued", + message="비디오 생성 작업이 큐에 추가되었습니다. `/veo/status/{task_id}`로 상태를 확인하세요." + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"작업 큐 추가 실패: {str(e)}") + + +@app.get( + "/veo/status/{task_id}", + response_model=VeoTaskStatus, + responses={ + 200: { + "description": "작업 상태 조회 성공", + "model": VeoTaskStatus, + }, + 404: { + "description": "작업을 찾을 수 없음", + "model": ErrorResponse, + }, + 500: { + "description": "서버 내부 오류", + "model": ErrorResponse, + } + }, + tags=["Video Generation"], + summary="비동기 비디오 생성 작업 상태 조회", + description=""" + ## 비동기 비디오 생성 작업의 상태를 조회합니다 + + ### 가능한 작업 상태: + - **PENDING**: 작업이 큐에서 대기 중 + - **PROGRESS**: 작업이 진행 중 (진행 상황 정보 포함) + - **SUCCESS**: 작업 완료 (비디오 URI 포함) + - **FAILURE**: 작업 실패 (오류 메시지 포함) + - **RETRY**: 작업 재시도 중 + - **REVOKED**: 작업이 취소됨 + + ### 진행 상황 정보: + PROGRESS 상태일 때 `progress` 필드에서 다음 정보를 확인할 수 있습니다: + - `status`: 세부 진행 상태 (starting, initializing, generating, processing) + - `message`: 현재 진행 상황 메시지 + - `operation`: Google API Operation ID (있는 경우) + - `elapsed_seconds`: 경과 시간 (있는 경우) + + ### 완료 후: + SUCCESS 상태가 되면 `result` 필드에서 `video_uri`를 통해 비디오를 다운로드할 수 있습니다. + """ +) +def get_veo_task_status(task_id: str): + if not _has_genai: + raise HTTPException( + status_code=400, + detail="Veo 3 모델은 현재 사용할 수 없습니다. Google Cloud 프로젝트에서 Generative Language API를 활성화해야 합니다." + ) + + try: + # Celery에서 작업 결과 조회 + task_result = celery_app.AsyncResult(task_id) + + if task_result.state == "PENDING": + return VeoTaskStatus( + task_id=task_id, + status="PENDING", + result=None, + progress=None, + error=None + ) + elif task_result.state == "PROGRESS": + return VeoTaskStatus( + task_id=task_id, + status="PROGRESS", + result=None, + progress=task_result.info, + error=None + ) + elif task_result.state == "SUCCESS": + return VeoTaskStatus( + task_id=task_id, + status="SUCCESS", + result=task_result.result, + progress=None, + error=None + ) + elif task_result.state == "FAILURE": + return VeoTaskStatus( + task_id=task_id, + status="FAILURE", + result=None, + progress=None, + error=str(task_result.info) + ) + else: + return VeoTaskStatus( + task_id=task_id, + status=task_result.state, + result=task_result.result if hasattr(task_result, 'result') else None, + progress=task_result.info if hasattr(task_result, 'info') else None, + error=None + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"작업 상태 조회 실패: {str(e)}") + + +@app.delete( + "/veo/cancel/{task_id}", + responses={ + 200: {"description": "작업 취소 성공"}, + 404: {"description": "작업을 찾을 수 없음"}, + 500: {"description": "서버 내부 오류"} + }, + tags=["Video Generation"], + summary="비동기 비디오 생성 작업 취소", + description=""" + ## 진행 중인 비동기 비디오 생성 작업을 취소합니다 + + ### 주의사항: + - 이미 시작된 Google API 호출은 취소할 수 없을 수 있습니다 + - 큐에서 대기 중인 작업은 즉시 취소됩니다 + - 진행 중인 작업은 다음 체크포인트에서 취소됩니다 + """ +) +def cancel_veo_task(task_id: str): + if not _has_genai: + raise HTTPException( + status_code=400, + detail="Veo 3 모델은 현재 사용할 수 없습니다. Google Cloud 프로젝트에서 Generative Language API를 활성화해야 합니다." + ) + + try: + celery_app.control.revoke(task_id, terminate=True) + return {"message": f"작업 {task_id}이(가) 취소되었습니다."} + except Exception as e: + raise HTTPException(status_code=500, detail=f"작업 취소 실패: {str(e)}") diff --git a/app/schemas/sentences.py b/app/schemas/sentences.py index c60971b..8f90518 100644 --- a/app/schemas/sentences.py +++ b/app/schemas/sentences.py @@ -4,6 +4,7 @@ class SentencesRequest(BaseModel): # 하나만 써도 되고, 둘 다 쓰면 paragraphs를 우선 사용 text: Optional[str] = None + paragraphs: Optional[List[str]] = None class SentencesResponse(BaseModel): # 입력 전체를 하나로 합친 원문(요약용) diff --git a/app/schemas/veo.py b/app/schemas/veo.py new file mode 100644 index 0000000..5accc83 --- /dev/null +++ b/app/schemas/veo.py @@ -0,0 +1,110 @@ +from typing import Optional, Any, Dict +from pydantic import BaseModel, Field +import time + +try: + from google import genai + from google.genai.types import GenerateVideosConfig + _has_google_genai = True +except Exception: + _has_google_genai = False + + +class VeoRequest(BaseModel): + """비디오 생성 요청 모델""" + prompt: str = Field( + ..., + description="비디오 생성을 위한 텍스트 설명", + min_length=10, + max_length=500 + ) + aspect_ratio: Optional[str] = Field( + "16:9", + description="비디오 화면 비율" + ) + model: Optional[str] = Field( + "veo-2.0-generate-001", + description="사용할 Veo 모델 버전" + ) + timeout_seconds: Optional[int] = Field( + 600, # 기본값을 10분으로 증가 + description="비디오 생성 대기 시간 (초)", + ge=30, + le=1200 # 최대 20분 + ) + + class Config: + schema_extra = { + "example": { + "prompt": "A majestic eagle soaring over snow-capped mountains at sunset", + "aspect_ratio": "16:9", + "model": "veo-2.0-generate-001", + "timeout_seconds": 600 + } + } + + +class VeoAsyncRequest(BaseModel): + """비동기 비디오 생성 요청 모델""" + prompt: str = Field( + ..., + description="비디오 생성을 위한 텍스트 설명", + min_length=10, + max_length=500 + ) + aspect_ratio: Optional[str] = Field( + "16:9", + description="비디오 화면 비율" + ) + model: Optional[str] = Field( + "veo-2.0-generate-001", + description="사용할 Veo 모델 버전" + ) + timeout_seconds: Optional[int] = Field( + 600, + description="비디오 생성 대기 시간 (초)", + ge=30, + le=1200 + ) + + class Config: + schema_extra = { + "example": { + "prompt": "A red panda riding a skateboard in a sunny park", + "aspect_ratio": "16:9", + "model": "veo-2.0-generate-001", + "timeout_seconds": 600 + } + } + + +class VeoResponse(BaseModel): + """비디오 생성 응답 모델""" + status: str = Field(description="생성 상태 (completed, pending, completed_with_error, error)") + video_uri: Optional[str] = Field(None, description="생성된 비디오 다운로드 URI") + local_path: Optional[str] = Field(None, description="로컬에 저장된 비디오 파일 경로") + operation: Optional[str] = Field(None, description="Google API Operation ID") + message: Optional[str] = Field(None, description="상세 메시지") + error: Optional[str] = Field(None, description="오류 메시지 (있는 경우)") + + +class VeoAsyncResponse(BaseModel): + """비동기 비디오 생성 응답 모델""" + task_id: str = Field(description="비동기 작업 ID") + status: str = Field(description="작업 상태") + message: str = Field(description="상태 메시지") + + +class VeoTaskStatus(BaseModel): + """비동기 작업 상태 조회 응답 모델""" + task_id: str = Field(description="작업 ID") + status: str = Field(description="작업 상태 (PENDING, PROGRESS, SUCCESS, FAILURE, RETRY, REVOKED)") + result: Optional[Dict[str, Any]] = Field(None, description="작업 결과 (완료시)") + progress: Optional[Dict[str, Any]] = Field(None, description="진행 상황 정보") + error: Optional[str] = Field(None, description="오류 메시지") + + +class ErrorResponse(BaseModel): + """오류 응답 모델""" + error: str = Field(description="오류 메시지") + \ No newline at end of file diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..e9e0066 --- /dev/null +++ b/app/tasks/__init__.py @@ -0,0 +1 @@ +# 태스크 패키지 \ No newline at end of file diff --git a/app/tasks/veo_tasks.py b/app/tasks/veo_tasks.py new file mode 100644 index 0000000..0f85f9d --- /dev/null +++ b/app/tasks/veo_tasks.py @@ -0,0 +1,237 @@ +import os +import time +import asyncio +import aiofiles +from pathlib import Path +from typing import Dict, Any +from celery import current_task +import boto3 +from botocore.exceptions import NoCredentialsError, ClientError +import io + +from ..celery_app import celery_app + +# Google GenAI SDK 사용 +try: + from google import genai + from google.genai.types import GenerateVideosConfig + import httpx + _has_genai = True +except ImportError: + _has_genai = False + + +def upload_video_to_s3(video_uri: str, task_id: str) -> str: + """ + Google API에서 비디오를 다운로드하여 S3에 업로드합니다. + """ + try: + # Google API Key 가져오기 + google_api_key = os.getenv("GOOGLE_API_KEY") + if not google_api_key: + return "업로드 실패: GOOGLE_API_KEY 환경변수가 설정되지 않았습니다." + + # AWS 설정 확인 + aws_access_key = os.getenv("AWS_ACCESS_KEY_ID") + aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") + aws_region = os.getenv("AWS_REGION", "ap-northeast-2") + s3_bucket = os.getenv("S3_BUCKET_NAME") + + if not all([aws_access_key, aws_secret_key, s3_bucket]): + return "업로드 실패: AWS 설정이 완료되지 않았습니다. (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, S3_BUCKET_NAME 필요)" + + # S3 클라이언트 초기화 + s3_client = boto3.client( + 's3', + aws_access_key_id=aws_access_key, + aws_secret_access_key=aws_secret_key, + region_name=aws_region + ) + + # HTTP 클라이언트로 비디오 다운로드 (API 키를 쿼리 파라미터로) + if "?" in video_uri: + download_url = f"{video_uri}&key={google_api_key}" + else: + download_url = f"{video_uri}?key={google_api_key}" + + with httpx.Client(timeout=300, follow_redirects=True) as client: + response = client.get(download_url) + response.raise_for_status() + + # Content-Type 확인 + content_type = response.headers.get('content-type', 'video/mp4') + if len(response.content) < 1000: + return f"업로드 실패: 파일이 너무 작습니다 (Content-Type: {content_type}, 크기: {len(response.content)} bytes)" + + # S3에 업로드할 파일명 생성 + s3_key = f"veo-videos/veo_video_{task_id}.mp4" + + # 비디오 데이터를 BytesIO로 래핑 + video_data = io.BytesIO(response.content) + + # S3에 업로드 + s3_client.upload_fileobj( + video_data, + s3_bucket, + s3_key, + ExtraArgs={ + 'ContentType': content_type, + 'Metadata': { + 'task_id': task_id, + 'source': 'google-veo', + 'original_uri': video_uri + } + } + ) + + file_size = len(response.content) / (1024 * 1024) # MB + s3_url = f"https://{s3_bucket}.s3.{aws_region}.amazonaws.com/{s3_key}" + return f"S3 업로드 완료: {s3_url} (크기: {file_size:.2f}MB)" + + except NoCredentialsError: + return "업로드 실패: AWS 자격 증명을 찾을 수 없습니다." + except ClientError as e: + return f"S3 업로드 실패: {str(e)}" + except Exception as e: + return f"업로드 실패: {str(e)}" + + +@celery_app.task(bind=True, name="generate_veo_video_async") +def generate_veo_video_task( + self, + prompt: str, + aspect_ratio: str = "16:9", + model: str = "gemini-1.5-flash", + timeout_seconds: int = 600 +) -> Dict[str, Any]: + """ + 비동기로 Veo 비디오를 생성하는 Celery 태스크 + """ + + # 진행 상황 업데이트 + self.update_state( + state="PROGRESS", + meta={"status": "starting", "message": "비디오 생성을 시작합니다..."} + ) + + if not _has_genai: + return { + "status": "error", + "video_uri": None, + "operation": None, + "message": None, + "error": "google-genai 패키지가 필요합니다. pip install google-genai를 실행하세요." + } + + # Google API Key 확인 + google_api_key = os.getenv("GOOGLE_API_KEY") + if not google_api_key: + return { + "status": "error", + "video_uri": None, + "operation": None, + "message": None, + "error": "GOOGLE_API_KEY 환경변수가 설정되지 않았습니다." + } + + try: + # 진행 상황 업데이트 + self.update_state( + state="PROGRESS", + meta={"status": "initializing", "message": "Google GenAI 클라이언트를 초기화합니다..."} + ) + + # Google GenAI Client 초기화 + client = genai.Client(api_key=google_api_key) + + # 진행 상황 업데이트 + self.update_state( + state="PROGRESS", + meta={"status": "generating", "message": f"'{prompt}' 프롬프트로 Veo 3 비디오를 생성중입니다..."} + ) + + # Veo 3 비디오 생성 시작 + operation = client.models.generate_videos( + model="veo-3.0-generate-preview", + prompt=prompt, + config=GenerateVideosConfig( + aspect_ratio=aspect_ratio + # duration 매개변수는 현재 지원되지 않음 + ) + ) + + # 비디오 생성 완료까지 대기 + start_time = time.time() + while not operation.done: + elapsed_time = time.time() - start_time + if elapsed_time > timeout_seconds: + return { + "status": "error", + "video_uri": None, + "operation": str(operation), + "message": None, + "error": f"비디오 생성 시간 초과 ({timeout_seconds}초)" + } + + # 진행 상황 업데이트 + progress_message = f"비디오 생성 중... ({int(elapsed_time)}초 경과)" + self.update_state( + state="PROGRESS", + meta={"status": "generating", "message": progress_message} + ) + + time.sleep(15) # 15초마다 상태 확인 + operation = client.operations.get(operation) + + # 비디오 생성 성공 확인 + if not operation.response or not hasattr(operation, 'result') or not operation.result: + return { + "status": "error", + "video_uri": None, + "operation": str(operation), + "message": None, + "error": "비디오 생성에 실패했습니다." + } + + # 생성된 비디오 URI 가져오기 + video_uri = None + if hasattr(operation.result, 'generated_videos') and operation.result.generated_videos: + video_obj = operation.result.generated_videos[0] + if hasattr(video_obj, 'video') and video_obj.video and hasattr(video_obj.video, 'uri'): + video_uri = video_obj.video.uri + + if not video_uri: + return { + "status": "error", + "video_uri": None, + "operation": str(operation), + "message": None, + "error": "생성된 비디오 URI를 가져올 수 없습니다." + } + + # S3에 비디오 업로드 + s3_upload_result = upload_video_to_s3(video_uri, self.request.id) + + # 완료 상태 업데이트 + self.update_state( + state="PROGRESS", + meta={"status": "completed", "message": "비디오 생성 및 S3 업로드가 완료되었습니다."} + ) + + return { + "status": "completed", + "video_uri": video_uri, + "s3_path": s3_upload_result, + "operation": str(operation), + "message": f"비디오가 성공적으로 생성되고 S3에 업로드되었습니다: {s3_upload_result}", + "error": None + } + + except Exception as e: + return { + "status": "error", + "video_uri": None, + "operation": None, + "message": None, + "error": f"Google GenAI 호출 중 오류 발생: {str(e)}" + } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..706b4fa --- /dev/null +++ b/requirements.txt @@ -0,0 +1,40 @@ +# FastAPI 및 웹 프레임워크 +fastapi>=0.104.1 +uvicorn[standard]>=0.24.0 +python-multipart>=0.0.6 + +# 이미지 처리 +Pillow>=10.0.0 + +# 환경 변수 관리 +python-dotenv>=1.0.0 + +# HTTP 클라이언트 +httpx>=0.25.0 + +# 데이터 모델링 +pydantic>=2.0.0 + +# Google AI/Gemini API (Veo 3 비디오 생성용) +google-genai>=1.30.0 + +# Google Cloud 인증 및 스토리지 +google-cloud-storage>=2.0.0 +google-auth>=2.0.0 + +# 파일 다운로드 +aiofiles>=23.0.0 + +# OpenAI API (문장 검증용) +openai>=1.0.0 + +# 타입 힌트 지원 +typing-extensions>=4.0.0 + +# 비동기 작업 처리 +celery>=5.3.0 +redis>=5.0.0 +flower>=2.0.0 + +# AWS S3 업로드 +boto3>=1.28.0 \ No newline at end of file diff --git a/scripts/start_all.sh b/scripts/start_all.sh new file mode 100755 index 0000000..decba0c --- /dev/null +++ b/scripts/start_all.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +echo "=== AI 비디오 생성 서비스 시작 ===" +echo "" + +# 현재 디렉토리를 프로젝트 루트로 설정 +cd "$(dirname "$0")/.." + +# Redis 시작 +# echo "1. Redis 서버 시작..." +# docker-compose up -d redis +# sleep 3 + +# 환경 변수 확인 +echo "2. 환경 변수 확인..." +if [ ! -f .env ]; then + echo "⚠️ .env 파일이 없습니다. .env.example을 참고하여 .env 파일을 생성하세요." + exit 1 +fi + +# Python 의존성 확인 +echo "3. Python 의존성 확인..." +python -c "import celery, redis" 2>/dev/null || { + echo "⚠️ 필요한 Python 패키지가 설치되지 않았습니다." + echo "다음 명령어로 설치하세요: pip install -r requirements.txt" + exit 1 +} + +echo "" +echo "=== 서비스 시작 완료 ===" +echo "" +echo "다음 명령어로 각 컴포넌트를 실행하세요:" +echo "" +echo "1. FastAPI 서버:" +echo " uvicorn app.main:app --reload --host 0.0.0.0 --port 8000" +echo "" +echo "2. Celery 워커 (새 터미널):" +echo " ./scripts/start_celery.sh" +echo "" +echo "3. Flower 모니터링 (선택사항, 새 터미널):" +echo " ./scripts/start_flower.sh" +echo "" +echo "=== 접속 정보 ===" +echo "FastAPI 문서: http://localhost:8000/docs" +echo "Redis Commander: http://localhost:8081" +echo "Flower 모니터링: http://localhost:5555 (Flower 시작 후)" \ No newline at end of file diff --git a/scripts/start_celery.sh b/scripts/start_celery.sh new file mode 100755 index 0000000..780ca6e --- /dev/null +++ b/scripts/start_celery.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +echo "Celery 워커를 시작합니다..." + +# 현재 디렉토리를 프로젝트 루트로 설정 +cd "$(dirname "$0")/.." + +# 환경 변수 로드 +export PYTHONPATH="${PYTHONPATH}:$(pwd)" + +# Celery 워커 시작 +celery -A app.celery_app worker --loglevel=info --concurrency=4 + +echo "Celery 워커가 종료되었습니다." \ No newline at end of file diff --git a/scripts/start_flower.sh b/scripts/start_flower.sh new file mode 100755 index 0000000..a5e3647 --- /dev/null +++ b/scripts/start_flower.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +echo "Flower (Celery 모니터링)를 시작합니다..." + +# 현재 디렉토리를 프로젝트 루트로 설정 +cd "$(dirname "$0")/.." + +# 환경 변수 로드 +export PYTHONPATH="${PYTHONPATH}:$(pwd)" + +# Flower 시작 +celery -A app.celery_app flower --port=5555 + +echo "Flower가 종료되었습니다." \ No newline at end of file diff --git a/scripts/start_redis.sh b/scripts/start_redis.sh new file mode 100755 index 0000000..e9972db --- /dev/null +++ b/scripts/start_redis.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +echo "Redis 서버를 시작합니다..." + +# Docker Compose로 Redis 시작 +docker-compose up -d redis + +echo "Redis 서버가 시작되었습니다." +echo "Redis Commander는 http://localhost:8081 에서 확인할 수 있습니다." +echo "" +echo "Redis 연결 확인:" +echo " Host: localhost" +echo " Port: 6379" +echo " Database: 0" \ No newline at end of file diff --git a/test/test_async_veo.py b/test/test_async_veo.py new file mode 100755 index 0000000..9929a84 --- /dev/null +++ b/test/test_async_veo.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +비동기 Veo 비디오 생성 API 테스트 스크립트 +""" + +import requests +import time +import json +from typing import Dict, Any + +BASE_URL = "http://localhost:8000" + +def test_async_veo_generation(): + """비동기 Veo 비디오 생성 테스트""" + + # 1. 비동기 비디오 생성 요청 + print("=== 비동기 비디오 생성 테스트 ===") + print("1. 비디오 생성 요청 중...") + + payload = { + "prompt": "A red panda riding a skateboard in a sunny park with trees and blue sky", + "aspect_ratio": "16:9", + "timeout_seconds": 600 + } + + response = requests.post(f"{BASE_URL}/veo/async", json=payload) + + if response.status_code != 202: + print(f"❌ 요청 실패: {response.status_code}") + print(response.text) + return + + result = response.json() + task_id = result["task_id"] + print(f"✅ 작업이 큐에 추가됨") + print(f" Task ID: {task_id}") + print(f" 상태: {result['status']}") + print(f" 메시지: {result['message']}") + print() + + # 2. 작업 상태 주기적 조회 + print("2. 작업 상태 모니터링...") + print(" (Ctrl+C로 중단 가능)") + print() + + start_time = time.time() + + while True: + try: + # 상태 조회 + status_response = requests.get(f"{BASE_URL}/veo/status/{task_id}") + + if status_response.status_code != 200: + print(f"❌ 상태 조회 실패: {status_response.status_code}") + break + + status = status_response.json() + elapsed = int(time.time() - start_time) + + print(f"[{elapsed:3d}초] 상태: {status['status']}") + + if status["status"] == "PENDING": + print(" 대기 중...") + + elif status["status"] == "PROGRESS": + progress = status.get("progress", {}) + print(f" 진행 상황: {progress.get('message', 'N/A')}") + if "elapsed_seconds" in progress: + print(f" 작업 경과: {progress['elapsed_seconds']}초") + + elif status["status"] == "SUCCESS": + result = status.get("result", {}) + print("✅ 작업 완료!") + print(f" 상태: {result.get('status', 'N/A')}") + print(f" 메시지: {result.get('message', 'N/A')}") + if result.get("video_uri"): + print(f" 비디오 URI: {result['video_uri']}") + else: + print(" ⚠️ 비디오 URI가 없습니다") + break + + elif status["status"] == "FAILURE": + print("❌ 작업 실패!") + print(f" 오류: {status.get('error', 'Unknown error')}") + break + + else: + print(f" 알 수 없는 상태: {status['status']}") + + print() + time.sleep(10) # 10초마다 확인 + + except KeyboardInterrupt: + print("\n사용자에 의해 중단됨") + + # 작업 취소 여부 확인 + cancel = input("작업을 취소하시겠습니까? (y/N): ").strip().lower() + if cancel in ['y', 'yes']: + try: + cancel_response = requests.delete(f"{BASE_URL}/veo/cancel/{task_id}") + if cancel_response.status_code == 200: + print("✅ 작업이 취소되었습니다") + else: + print(f"❌ 작업 취소 실패: {cancel_response.status_code}") + except Exception as e: + print(f"❌ 작업 취소 중 오류: {e}") + break + + except Exception as e: + print(f"❌ 오류 발생: {e}") + break + + +def test_sync_veo_generation(): + """동기 Veo 비디오 생성 테스트 (짧은 타임아웃)""" + + print("=== 동기 비디오 생성 테스트 (짧은 타임아웃) ===") + + payload = { + "prompt": "A simple animation of a bouncing ball", + "aspect_ratio": "16:9", + "timeout_seconds": 60 # 1분으로 제한하여 타임아웃 테스트 + } + + print("비디오 생성 요청 중... (60초 타임아웃)") + + try: + response = requests.post(f"{BASE_URL}/veo", json=payload, timeout=70) + + if response.status_code == 200: + result = response.json() + print("✅ 동기 생성 완료") + print(f" 상태: {result.get('status', 'N/A')}") + print(f" 메시지: {result.get('message', 'N/A')}") + if result.get("video_uri"): + print(f" 비디오 URI: {result['video_uri']}") + else: + print(f"❌ 요청 실패: {response.status_code}") + print(response.text) + + except requests.exceptions.Timeout: + print("⏰ 요청 타임아웃 (예상된 결과)") + except Exception as e: + print(f"❌ 오류 발생: {e}") + + +def check_service_health(): + """서비스 상태 확인""" + + print("=== 서비스 상태 확인 ===") + + try: + # Health check + response = requests.get(f"{BASE_URL}/health", timeout=5) + if response.status_code == 200: + print("✅ FastAPI 서버 정상") + else: + print(f"❌ FastAPI 서버 오류: {response.status_code}") + return False + except Exception as e: + print(f"❌ FastAPI 서버 연결 실패: {e}") + return False + + # API 문서 접근 확인 + try: + response = requests.get(f"{BASE_URL}/docs", timeout=5) + if response.status_code == 200: + print("✅ API 문서 접근 가능") + else: + print(f"⚠️ API 문서 접근 문제: {response.status_code}") + except Exception as e: + print(f"⚠️ API 문서 접근 실패: {e}") + + print() + return True + + +if __name__ == "__main__": + print("Veo 비동기 비디오 생성 API 테스트") + print("=" * 50) + print() + + # 서비스 상태 확인 + if not check_service_health(): + print("서비스가 실행 중인지 확인하세요:") + print("1. uvicorn app.main:app --reload --host 0.0.0.0 --port 8000") + print("2. ./scripts/start_celery.sh") + print("3. docker-compose up -d redis") + exit(1) + + # 테스트 선택 + print("테스트 선택:") + print("1. 비동기 비디오 생성 테스트 (권장)") + print("2. 동기 비디오 생성 테스트 (타임아웃 테스트)") + print("3. 둘 다 실행") + + choice = input("\n선택 (1-3): ").strip() + print() + + if choice == "1": + test_async_veo_generation() + elif choice == "2": + test_sync_veo_generation() + elif choice == "3": + test_async_veo_generation() + print("\n" + "=" * 50 + "\n") + test_sync_veo_generation() + else: + print("잘못된 선택입니다.") + + print("\n테스트 완료!") \ No newline at end of file diff --git a/test/test_s3_upload.py b/test/test_s3_upload.py new file mode 100644 index 0000000..2caacc1 --- /dev/null +++ b/test/test_s3_upload.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +S3 업로드 기능 테스트 스크립트 +""" +import os +import sys +import time +from pathlib import Path + +# 프로젝트 루트를 Python 경로에 추가 +sys.path.insert(0, str(Path(__file__).parent)) + +from app.tasks.veo_tasks import upload_video_to_s3 + + +def test_s3_upload(): + """S3 업로드 기능을 테스트합니다.""" + + print("🧪 S3 업로드 기능 테스트 시작...") + print("=" * 60) + + # 테스트 URL + test_video_uri = "https://generativelanguage.googleapis.com/download/v1beta/files/3pf5wmwdeq23:download?alt=media" + test_task_id = f"test_{int(time.time())}" + + print(f"📹 테스트 비디오 URI: {test_video_uri}") + print(f"🏷️ 테스트 Task ID: {test_task_id}") + print() + + # 환경변수 확인 + print("🔧 환경변수 확인:") + required_env_vars = [ + "GOOGLE_API_KEY", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_REGION", + "S3_BUCKET_NAME" + ] + + missing_vars = [] + for var in required_env_vars: + value = os.getenv(var) + if value: + if var in ["GOOGLE_API_KEY", "AWS_SECRET_ACCESS_KEY"]: + # 민감한 정보는 일부만 표시 + masked_value = value[:8] + "..." if len(value) > 8 else "***" + print(f" ✅ {var}: {masked_value}") + else: + print(f" ✅ {var}: {value}") + else: + print(f" ❌ {var}: 설정되지 않음") + missing_vars.append(var) + + if missing_vars: + print(f"\n⚠️ 누락된 환경변수: {', '.join(missing_vars)}") + print("테스트를 계속 진행하지만 실패할 수 있습니다.") + + print("\n" + "=" * 60) + print("🚀 S3 업로드 시작...") + + try: + # S3 업로드 실행 + start_time = time.time() + result = upload_video_to_s3(test_video_uri, test_task_id) + end_time = time.time() + + elapsed_time = end_time - start_time + + print(f"⏱️ 소요 시간: {elapsed_time:.2f}초") + print(f"📄 결과: {result}") + + if "S3 업로드 완료" in result: + print("\n✅ 테스트 성공! S3 업로드가 완료되었습니다.") + else: + print("\n❌ 테스트 실패. 결과를 확인해주세요.") + + except Exception as e: + print(f"\n💥 테스트 중 오류 발생: {str(e)}") + import traceback + traceback.print_exc() + + print("=" * 60) + print("🧪 테스트 완료") + + +if __name__ == "__main__": + test_s3_upload() \ No newline at end of file