diff --git a/app/celery_worker.py b/app/celery_worker.py index 8bcd3e2..2725549 100644 --- a/app/celery_worker.py +++ b/app/celery_worker.py @@ -12,18 +12,30 @@ backend=os.getenv("REDIS_URL", "redis://localhost:6379/0"), include=[ "app.tasks.migrate_task", - "app.tasks.feedback_task" - ] + "app.tasks.feedback_task", + "app.tasks.opensearch_task", + ], ) -celery_app.conf.timezone = "Asia/Seoul" +# 모듈을 미리 import 해 두면 워커 기동 시 태스크가 즉시 등록됩니다. +import app.tasks.migrate_task +import app.tasks.feedback_task +import app.tasks.opensearch_task + celery_app.conf.beat_schedule = { + # 매일 03:00 MongoDB → Chroma 마이그레이션 "daily-migrate-to-chroma": { "task": "app.tasks.migrate_task.batch_migrate_to_chroma", "schedule": crontab(hour=3, minute=0), }, + # 매일 04:00 Feedback → OpenSearch 전송 (경로 수정!) "daily-sync-feedback-to-opensearch": { - "task": "app.tasks.migrate_task.sync_feedback_logs_to_opensearch", + "task": "app.tasks.feedback_task.sync_feedback_logs_to_opensearch", "schedule": crontab(hour=4, minute=0), }, + # 매일 05:00 모든 로그 컬렉션 → OpenSearch + "daily-sync-opensearch-logs": { + "task": "sync_all_logs_to_opensearch", + "schedule": crontab(hour=5, minute=0), + }, } diff --git a/app/clients/__init__.py b/app/clients/__init__.py index abd9773..a9d43d8 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -1,8 +1,8 @@ from .chromadb_client import ChromaClient -from .mongodb import MongoDBClient +from .mongodb import MongoClientsManager # ✅ MongoClientsManager만 사용 from .openai_client import OpenAiClient - +# 클라이언트 초기화 ai_client = OpenAiClient() chroma_client = ChromaClient( @@ -10,18 +10,5 @@ openai_api_key=None ) -question_client = MongoDBClient() -assessment_client = MongoDBClient(db_name="assessment") -feedback_client = MongoDBClient(db_name="feedback") -recommendation_client = MongoDBClient(db_name="recommendation") -user_client = MongoDBClient(db_name="user") -interview_client = MongoDBClient(db_name="ai_interview") - -db_clients = { - "ai_platform": question_client, - "assessment": assessment_client, - "feedback": feedback_client, - "recommendation": recommendation_client, - "user": user_client, - "ai_interview" : interview_client, -} \ No newline at end of file +# MongoClientsManager 사용 +db_clients = MongoClientsManager() diff --git a/app/clients/chromadb_client.py b/app/clients/chromadb_client.py index c0c9f13..1123c40 100644 --- a/app/clients/chromadb_client.py +++ b/app/clients/chromadb_client.py @@ -39,7 +39,7 @@ def count_documents(self): return self.collection.count() def get_documents_by_user(self, user_id: str, limit: int = 5): - return self.collection.get(where={"user_id": user_id}) + return self.collection.get(where={"user_id": user_id}, limit=limit) def delete_documents(self, user_id: str = None, source: str = None, limit: Optional[int] = None, sort_order: Optional[str] = "asc"): filter_ = [] @@ -52,7 +52,8 @@ def delete_documents(self, user_id: str = None, source: str = None, limit: Optio print("\n삭제 조건이 없습니다.") return - where_clause = {"$and": filter_} if len(filter_) > 1 else filter_[0] + # 수정: Mongo 스타일 → Chroma 스타일 where_clause로 변환 + where_clause = {k: v for d in filter_ for k, v in d.items()} if limit: docs = self.collection.get(where=where_clause) @@ -86,7 +87,6 @@ def delete_all_documents(self, batch_size: int = 500): total = len(all_ids) print(f"\n[전체 삭제] 총 {total}개 문서 삭제 시작") - # Batch로 나누기 for i in range(0, total, batch_size): batch_ids = all_ids[i:i + batch_size] result = self.collection.delete(ids=batch_ids) @@ -96,7 +96,6 @@ def delete_all_documents(self, batch_size: int = 500): return total - if __name__ == "__main__": chroma = ChromaClient() chroma.delete_all_documents() diff --git a/app/clients/mongodb.py b/app/clients/mongodb.py index 73be993..c5d5ff6 100644 --- a/app/clients/mongodb.py +++ b/app/clients/mongodb.py @@ -12,32 +12,81 @@ def __init__(self, mongo_url: str | None = None, db_name: str = "ai_platform"): self.client = AsyncIOMotorClient(url) self.db = self.client[db_name] - # db_name = user + # 수정된 부분: .get → [] 방식으로 변경 self.user_profile = self.db["user_profile"] - - # db_name = recommendation self.recommendation_content = self.db["recommendation_content"] self.recommendation_cache = self.db["recommendation_cache"] - - # db_name = feedback self.feedback = self.db["feedback"] - - # db_name = assessment self.pre_result = self.db["pre_result"] self.post_result = self.db["post_result"] - - # db_name = ai_platform - self.techMap = self.db["techMap"] - self.questions = self.db["questions"] + self.techMap = self.db["techMap"] + self.startup_log = self.db["startup_log"] + self.interview_feedback = self.db["interview_feedback"] + self.interview_contents = self.db["interview_contents"] def get_category_collection(self, category: str): return self.db[category] async def save_explanation_log(self, log_data: dict): log_data["created_at"] = datetime.utcnow() - await self.explanation_log.insert_one(log_data) + await self.db["explanation_log"].insert_one(log_data) + + +class MongoClientsManager: + def __init__(self): + self._clients = { + "ai_platform": MongoDBClient(db_name="ai_platform"), + "assessment": MongoDBClient(db_name="assessment"), + "feedback": MongoDBClient(db_name="feedback"), + "recommendation": MongoDBClient(db_name="recommendation"), + "user": MongoDBClient(db_name="ai_platform"), # user_profile 포함 + "ai_interview": MongoDBClient(db_name="ai_interview"), + } + + def __getitem__(self, key): + return self._clients[key] + + @property + def user_profile(self): + return self._clients["user"].user_profile + + @property + def recommendation_content(self): + return self._clients["recommendation"].recommendation_content + + @property + def recommendation_cache(self): + return self._clients["recommendation"].recommendation_cache + + @property + def feedback(self): + return self._clients["feedback"].feedback + + @property + def pre_result(self): + return self._clients["assessment"].pre_result + + @property + def post_result(self): + return self._clients["assessment"].post_result + + @property + def questions(self): + return self._clients["ai_platform"].questions + @property + def techMap(self): + return self._clients["ai_platform"].techMap + @property + def startup_log(self): + return self._clients["ai_platform"].startup_log + @property + def interview_feedback(self): + return self._clients["ai_interview"].interview_feedback + @property + def interview_contents(self): + return self._clients["ai_interview"].interview_contents diff --git a/app/clients/opensearch_client.py b/app/clients/opensearch_client.py index 65dfc88..6a30fa8 100644 --- a/app/clients/opensearch_client.py +++ b/app/clients/opensearch_client.py @@ -1,9 +1,14 @@ +# app/clients/opensearch_client.py from opensearchpy import OpenSearch import os -opensearch_client = OpenSearch( - hosts=[os.getenv("OPENSEARCH_HOST", "http://localhost:9200")], - http_auth=(os.getenv("OPENSEARCH_USER", "admin"), os.getenv("OPENSEARCH_PASSWORD", "admin")), - use_ssl=False, - verify_certs=False -) +def get_opensearch_client(): + return OpenSearch( + hosts=[{"host": os.getenv("OPENSEARCH_HOST", "localhost"), "port": int(os.getenv("OPENSEARCH_PORT", 9200))}], + http_auth=(os.getenv("OPENSEARCH_USER", "admin"), os.getenv("OPENSEARCH_PASS", "admin")), + use_ssl=False, + verify_certs=False + ) + +# 기본 객체도 생성해둠 (기본 사용 용도) +opensearch_client = get_opensearch_client() diff --git a/app/create_index.py b/app/create_index.py new file mode 100644 index 0000000..1709fb9 --- /dev/null +++ b/app/create_index.py @@ -0,0 +1,95 @@ +from opensearchpy import OpenSearch + +# OpenSearch 연결 설정 +client = OpenSearch( + hosts=[{"host": "localhost", "port": 9200}], + http_auth=("admin", "admin"), + use_ssl=False, + verify_certs=False +) + +# 인덱스 매핑 정의 +def get_index_mappings(): + return { + "feedback_logs": { + "properties": { + "user_id": {"type": "keyword"}, + "feedback_type": {"type": "keyword"}, + "subject": {"type": "keyword"}, + "final_comment": { + "type": "text", + "fields": { + "keyword": {"type": "keyword"} + } + }, + "score": {"type": "integer"}, + "embedding": { + "type": "knn_vector", + "dimension": 768 + }, + "timestamp": {"type": "date"} + } + }, + "interview_logs": { + "properties": { + "user_id": {"type": "keyword"}, + "question_id": {"type": "keyword"}, + "question": {"type": "text"}, + "answer": {"type": "text"}, + "logic": {"type": "integer"}, + "accuracy": {"type": "integer"}, + "clarity": {"type": "integer"}, + "terms": {"type": "integer"}, + "overall_comment": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}} + }, + "timestamp": {"type": "date"} + } +}, + "recommendation_logs": { + "properties": { + "user_id": {"type": "keyword"}, + "title": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}} + }, + "user_context": {"type": "text"}, + "timestamp": {"type": "date"} + } + }, + "startup_logs": { + "properties": { + "service": {"type": "keyword"}, + "message": {"type": "text"}, + "timestamp": {"type": "date"} + } + }, + "pre_result_logs": { + "properties": { + "user_id": {"type": "keyword"}, + "subject": {"type": "keyword"}, + "score": {"type": "integer"}, + "level": {"type": "keyword"}, + "timestamp": {"type": "date"} + } + }, + "post_result_logs": { + "properties": { + "user_id": {"type": "keyword"}, + "subject": {"type": "keyword"}, + "score": {"type": "integer"}, + "level": {"type": "keyword"}, + "timestamp": {"type": "date"} + } + } + } + +# 인덱스 재생성 +index_mappings = get_index_mappings() +for index_name, mapping in index_mappings.items(): + if client.indices.exists(index=index_name): + client.indices.delete(index=index_name) + print(f"🗑️ Deleted index: {index_name}") + client.indices.create(index=index_name, body={"mappings": mapping}) + print(f"✅ Created index: {index_name}") diff --git a/app/insert_feedback_logs.py b/app/insert_feedback_logs.py new file mode 100644 index 0000000..4839c75 --- /dev/null +++ b/app/insert_feedback_logs.py @@ -0,0 +1,46 @@ +from datetime import datetime +from opensearchpy import OpenSearch +import random + +client = OpenSearch( + hosts=[{"host": "localhost", "port": 9200}], + http_auth=("admin", "admin"), + use_ssl=False, + verify_certs=False +) + +user_ids = [f"user_{i}" for i in range(1, 8)] +subjects = ["math", "cs", "physics", "bio"] +feedback_types = ["PRE", "POST"] +comments = [ + "개념 이해가 잘 되어 있습니다.", + "약간의 복습이 필요합니다.", + "용어 사용이 매우 정확합니다.", + "응답이 다소 모호했습니다.", + "전반적으로 향상된 모습입니다.", + "핵심 개념에 대한 이해가 부족합니다.", + "예시 사용이 효과적이었습니다.", + "개념 연결 능력이 향상되었습니다." +] + +docs = [] + +for uid in user_ids: + for _ in range(3): # 각 사용자당 피드백 3개 + doc = { + "user_id": uid, + "subject": random.choice(subjects), + "feedback_type": random.choice(feedback_types), + "score": random.randint(60, 100), + "final_comment": random.choice(comments), + "timestamp": datetime.utcnow().isoformat() + } + docs.append({"index": {"_index": "feedback_logs"}}) + docs.append(doc) + +response = client.bulk(body=docs) + +if not response["errors"]: + print("✅ 피드백 데이터 삽입 완료") +else: + print("❌ 삽입 중 오류 발생:", response) diff --git a/app/insert_interview_logs.py b/app/insert_interview_logs.py new file mode 100644 index 0000000..9796e94 --- /dev/null +++ b/app/insert_interview_logs.py @@ -0,0 +1,36 @@ +from datetime import datetime +from opensearchpy import OpenSearch +import random + +client = OpenSearch( + hosts=[{"host": "localhost", "port": 9200}], + http_auth=("admin", "admin"), + use_ssl=False, +) + +user_ids = [f"user_{i}" for i in range(1, 8)] +questions = [ + ("Q1", "Explain the concept of recursion."), + ("Q2", "What is normalization in databases?"), + ("Q3", "Define polymorphism in OOP.") +] + +for user in user_ids: + qid, question = random.choice(questions) + doc = { + "user_id": user, + "question_id": qid, + "question": question, + "answer": "Sample answer...", + "evaluation": { + "logic": random.randint(1, 5), + "accuracy": random.randint(1, 5), + "clarity": random.randint(1, 5), + "terms": random.randint(1, 5), + "overall_comment": "Well explained" if random.random() > 0.5 else "Needs improvement" + }, + "timestamp": datetime.utcnow().isoformat() + } + client.index(index="interview_logs", body=doc) + +print("✅ 삽입 완료: interview_logs") diff --git a/app/insert_post_result_logs.py b/app/insert_post_result_logs.py new file mode 100644 index 0000000..86e0e4c --- /dev/null +++ b/app/insert_post_result_logs.py @@ -0,0 +1,24 @@ +from datetime import datetime +from opensearchpy import OpenSearch +import random + +client = OpenSearch( + hosts=[{"host": "localhost", "port": 9200}], + http_auth=("admin", "admin"), + use_ssl=False, +) + +user_ids = [f"user_{i}" for i in range(1, 8)] +subjects = ["math", "cs", "law"] + +for user in user_ids: + doc = { + "user_id": user, + "subject": random.choice(subjects), + "score": random.randint(60, 100), + "level": random.choice(["novice", "intermediate", "expert"]), + "timestamp": datetime.utcnow().isoformat() + } + client.index(index="post_result_logs", body=doc) + +print("✅ 삽입 완료: post_result_logs") diff --git a/app/insert_pre_result_logs.py b/app/insert_pre_result_logs.py new file mode 100644 index 0000000..2075ed6 --- /dev/null +++ b/app/insert_pre_result_logs.py @@ -0,0 +1,37 @@ +# app/insert_pre_result_logs.py + +from datetime import datetime, timedelta +from random import randint, choice +from opensearchpy import OpenSearch, helpers + +# ✅ OpenSearch 클라이언트 설정 +client = OpenSearch( + hosts=[{"host": "localhost", "port": 9200}], + http_auth=("admin", "admin"), # 기본 인증 정보 (수정 가능) + use_ssl=False, + verify_certs=False +) + +# ✅ 샘플 유저 ID와 과목, 레벨 +user_ids = [f"user_{i:03d}" for i in range(1, 8)] +subjects = ["math", "cs", "law"] +levels = ["novice", "amateur", "intermediate", "expert", "master"] + +# ✅ 샘플 도큐먼트 리스트 생성 +docs = [] +for user_id in user_ids: + doc = { + "_index": "pre_result_logs", + "_source": { + "user_id": user_id, + "subject": choice(subjects), + "score": randint(40, 95), + "level": choice(levels), + "timestamp": (datetime.utcnow() - timedelta(days=randint(0, 5))).isoformat() + } + } + docs.append(doc) + +# ✅ OpenSearch에 Bulk 삽입 +success, _ = helpers.bulk(client, docs) +print(f"✅ 삽입 완료: {success}건 pre_result_logs에 입력됨") diff --git a/app/insert_recommendation_logs.py b/app/insert_recommendation_logs.py new file mode 100644 index 0000000..212106a --- /dev/null +++ b/app/insert_recommendation_logs.py @@ -0,0 +1,36 @@ +# app/insert_recommendation_logs.py + +from datetime import datetime, timedelta +from opensearchpy import OpenSearch +import random + +client = OpenSearch( + hosts=[{"host": "localhost", "port": 9200}], + http_auth=("admin", "admin"), + use_ssl=False, + verify_certs=False +) + +user_ids = [f"user_{i}" for i in range(1, 7)] +titles = ["ML 기초 책", "딥러닝 영상 강의", "GPT 논문 요약", "RAG 튜토리얼"] +contexts = ["짧은 시간", "예산 10만원 이하", "책을 선호", "실습 중심"] + +docs = [] + +for uid in user_ids: + for _ in range(2): + doc = { + "user_id": uid, + "title": random.choice(titles), + "user_context": random.choice(contexts), + "timestamp": (datetime.utcnow() - timedelta(days=random.randint(0, 15))).isoformat() + } + docs.append({"index": {"_index": "recommendation_logs"}}) + docs.append(doc) + +response = client.bulk(body=docs) + +if not response["errors"]: + print("✅ recommendation_logs 삽입 완료") +else: + print("❌ 오류:", response) diff --git a/app/main.py b/app/main.py index d137bf9..877f65c 100644 --- a/app/main.py +++ b/app/main.py @@ -2,6 +2,7 @@ import asyncio from prometheus_fastapi_instrumentator import Instrumentator +from app.routers.opensearch_test_router import router as opensearch_test_router from app.utils.metrics import observe_latency from app.routers.pre_assessment_router import router as assessment_router from app.routers.post_assessment_router import router as post_assessment_router @@ -36,6 +37,7 @@ ).instrument(app).expose(app) # 라우터 등록 +app.include_router(opensearch_test_router, prefix="/api/opensearch", tags=["OpenSearch 로그 전송"]) app.include_router(assessment_router, prefix="/api/pre", tags=["사전 평가 기능 관련 API"]) app.include_router(post_assessment_router, prefix="/api/post", tags=["사후 평가 기능 관련 API"]) app.include_router(feedback_router, prefix="/api/feedback", tags=["피드백 기능 관련 API"]) @@ -46,6 +48,7 @@ app.include_router(chroma_test_router, prefix="/api/test", tags=["Chroma 삽입 테스트"]) app.include_router(chroma_manage_router, prefix="/api/chroma/manage", tags=["ChromaDB 관리 API"]) + # Kafka Consumer Task feedback_consumer_task = None recom_consumer_task = None @@ -57,7 +60,7 @@ async def startup_event(): await init_feedback_producer() await init_recommendation_producer() feedback_consumer_task = asyncio.create_task(consume_feedback()) - recom_consumer_task = asyncio.create_task(consume_recommend()) + recom_consumer_task = asyncio.create_task(consume_recommend())# @app.on_event("shutdown") async def shutdown_event(): @@ -75,5 +78,5 @@ async def shutdown_event(): try: await recom_consumer_task except asyncio.CancelledError: - pass - await close_recommendation_producer() \ No newline at end of file + pass + await close_recommendation_producer() \ No newline at end of file diff --git a/app/routers/chroma_test_router.py b/app/routers/chroma_test_router.py index 6334094..a88c795 100644 --- a/app/routers/chroma_test_router.py +++ b/app/routers/chroma_test_router.py @@ -1,15 +1,16 @@ # app/routers/chroma_test_router.py from fastapi import APIRouter -from app.tasks.feedback_task import sync_feedback_to_chroma_task +from app.utils.embed import embed_to_chroma # 또는 필요한 모듈 router = APIRouter() -@router.post("/chroma/test-insert", summary="문서 삽입 테스트 (Celery)") +@router.post("/chroma/test-insert", summary="문서 삽입 테스트 (동기)") async def test_insert_to_chroma(user_id: str, content: str, source_id: str): - """ - Chroma 테스트용 문서 삽입 라우터 - - Celery Task로 비동기 실행됨 - """ - sync_feedback_to_chroma_task.delay(user_id, content, source_id) - return {"message": "비동기 문서 삽입 요청 완료 (Celery 전송됨)"} + result = embed_to_chroma( + user_id=user_id, + content=content, + source="feedback", + source_id=source_id + ) + return {"message": "동기 문서 삽입 완료", "result": result} diff --git a/app/routers/feedback_router.py b/app/routers/feedback_router.py index 06d5cb2..af52c86 100644 --- a/app/routers/feedback_router.py +++ b/app/routers/feedback_router.py @@ -2,26 +2,26 @@ from fastapi.encoders import jsonable_encoder from starlette.responses import JSONResponse -from app.clients import ai_client -from app.clients import db_clients +from app.clients import ai_client, db_clients from app.models.feedback.response import FeedbackResponse, Info, Feedback from app.services.assessment.post import get_post_assessments from app.services.common.common import subject_id_to_name from typing import List - from app.services.feedback.builder import build_feedback -from app.services.prompt.builder import generate_feedback_prompt, build_full_prompt, calculate_chapter_scores +from app.services.prompt.builder import generate_feedback_prompt, build_full_prompt from app.utils.embed import embed_to_chroma router = APIRouter() -feedback_db = db_clients["feedback"] -assessment_db = db_clients["assessment"] -user_db = db_clients["user"] - -@router.get("", response_model=List[FeedbackResponse], response_model_by_alias=True, summary="지정한 사용자의 피드백을 반환", description="해당 유저의 전체 피드백을 반환한다.") +@router.get( + "", + response_model=List[FeedbackResponse], + response_model_by_alias=True, + summary="지정한 사용자의 피드백을 반환", + description="해당 유저의 전체 피드백을 반환한다." +) async def list_feedbacks(userId: str): - target = feedback_db.feedback.find({"info.userId": userId}) + target = db_clients.feedback.find({"info.userId": userId}) docs = await target.to_list(length=1000) responses: List[FeedbackResponse] = [] @@ -42,8 +42,8 @@ async def list_feedbacks(userId: str): return JSONResponse(status_code=200, content=jsonable_encoder(serialized)) -async def generate_feedback(user_id, subject_id, feedback_type, nth): - user = await user_db.user_profile.find_one({"user_id": user_id}) +async def generate_feedback(user_id: str, subject_id: int, feedback_type: str, nth: int): + user = await db_clients.user_profile.find_one({"user_id": user_id}) if not user: raise HTTPException(status_code=404, detail="사용자 정보를 찾을 수 없습니다.") @@ -54,12 +54,15 @@ async def generate_feedback(user_id, subject_id, feedback_type, nth): system_msg = "당신은 한국어로 응답하는 학습 성장 분석가입니다." feedback_text = ai_client.create_chat_response(system_msg, full_prompt) + feedback, info, scores = await build_feedback(user, feedback_text, max_score) + + # 디버깅 로그 (필요 시 제거) print("DEBUG - feedback:", feedback) print("DEBUG - info:", info) print("DEBUG - scores:", scores) - # Chroma 자동 삽입 + # Chroma 자동 임베딩 embed_to_chroma( user_id=user_id, content=feedback_text, @@ -67,7 +70,7 @@ async def generate_feedback(user_id, subject_id, feedback_type, nth): source_id=subject ) - await feedback_db.feedback.insert_one({ + await db_clients.feedback.insert_one({ "info": info, "scores": scores, "feedback": { @@ -77,10 +80,8 @@ async def generate_feedback(user_id, subject_id, feedback_type, nth): } }) - return_json = { + return { "info": info, "scores": scores, "feedback": feedback } - - return return_json \ No newline at end of file diff --git a/app/routers/opensearch_test_router.py b/app/routers/opensearch_test_router.py new file mode 100644 index 0000000..5e1d357 --- /dev/null +++ b/app/routers/opensearch_test_router.py @@ -0,0 +1,11 @@ +# app/routers/opensearch_test_router.py + +from fastapi import APIRouter +from app.tasks.opensearch_task import sync_all_logs_to_opensearch + +router = APIRouter() + +@router.post("/opensearch/test-sync", summary="모든 Mongo 로그 → OpenSearch 전송") +async def test_opensearch_sync(): + sync_all_logs_to_opensearch.delay() + return {"message": "전송 요청 완료됨 (Celery 실행됨)"} diff --git a/app/routers/post_assessment_router.py b/app/routers/post_assessment_router.py index cb54769..7844fb2 100644 --- a/app/routers/post_assessment_router.py +++ b/app/routers/post_assessment_router.py @@ -1,8 +1,7 @@ from fastapi import APIRouter, HTTPException, Response - -from app.clients import db_clients from typing import List +from app.clients import db_clients from app.models.pre_assessment.request import AssessmentResult from app.models.pre_assessment.response import QuestionStructure from app.services.assessment.common import result_generate, safe_sample @@ -13,34 +12,30 @@ router = APIRouter() -question_db = db_clients["ai_platform"] +question_db = db_clients["ai_platform"] # ✅ 명시적으로 ai_platform DB 접근 assessment_db = db_clients["assessment"] user_db = db_clients["user"] -@router.get("/subject", response_model=List[QuestionStructure], response_model_by_alias=False, summary="사후 평가 문제를 생성", description="데이터베이스에서 사전에 지정된 규칙에 따라 저장된 문제를 가져오고, 사전 평가 문제 데이터셋을 완성한다.") -async def get_pretest(user_id:str, subject_id: int): +@router.get("/subject", response_model=List[QuestionStructure], response_model_by_alias=False, summary="사후 평가 문제 생성") +async def get_posttest(user_id: str, subject_id: int): user = await get_user(user_id) - - tmp = user.get("level", {}) - level = tmp.get(str(subject_id)) + level = user.get("level", {}).get(str(subject_id)) + if not level: + raise HTTPException(status_code=500, detail="Forbidden attempt occurred") subject_name = await subject_id_to_name(subject_id) + all_questions = await question_db.db[subject_name].find().to_list(length=1000) # ✅ 과목명 컬렉션에서 검색 - question_count = {} - if level == "novice": - question_count = {"high": 0, "medium": 1, "low": 2} - elif level == "amateur": - question_count = {"high": 0, "medium": 2, "low": 1} - elif level == "intermediate": - question_count = {"high": 1, "medium": 1, "low": 1} - elif level == "expert": - question_count = {"high": 2, "medium": 1, "low": 0} - elif level == "master": - question_count = {"high": 3, "medium": 0, "low": 0} - else: - raise HTTPException(status_code=500, detail="Forbidden attempt occurred") + question_count = { + "novice": {"high": 0, "medium": 1, "low": 2}, + "amateur": {"high": 0, "medium": 2, "low": 1}, + "intermediate": {"high": 1, "medium": 1, "low": 1}, + "expert": {"high": 2, "medium": 1, "low": 0}, + "master": {"high": 3, "medium": 0, "low": 0} + }.get(level) - all_questions = await question_db.db[subject_name].find().to_list(length=1000) + if not question_count: + raise HTTPException(status_code=500, detail="Invalid level") selected: List[QuestionStructure] = [] for chapter_num in range(1, 6): @@ -54,46 +49,36 @@ async def get_pretest(user_id:str, subject_id: int): selected += safe_sample(mid_qs, question_count["medium"]) selected += safe_sample(easy_qs, question_count["low"]) - result = result_generate(selected) - return result - + return result_generate(selected) -@router.post('/subject', summary="사용자의 사후 평가 결과를 저장", description="백엔드 서버에서 전송된 사용자의 사후 평가 결과를 데이터베이스에 저장한다.") -async def save_result(user_id: str, payload: AssessmentResult): +@router.post("/subject", summary="사후 평가 결과 저장") +async def save_posttest_result(user_id: str, payload: AssessmentResult): user = await get_user(user_id) compiled_data = payload.model_dump(exclude={"userId"}) - level = await level_to_string(payload.subject.level) level_key = str(payload.subject.subjectId) + level = await level_to_string(payload.subject.level) - # Chroma 자동 삽입 + # Chroma 자동 삽입 for ch in compiled_data.get("chapters", []): - content = ch.get("userAnswer", "") - qid = ch.get("questionId") - embed_to_chroma(user_id=user_id, content=content, source="post_result", source_id=str(qid)) + embed_to_chroma( + user_id=user_id, + content=ch.get("userAnswer", ""), + source="post_result", + source_id=str(ch.get("questionId")) + ) new_key = await generate_key(user) + await assessment_db.post_result.update_one( - { - "userId": user["user_id"], - }, - { - "$set": { - new_key: compiled_data - } - }, + {"userId": user["user_id"]}, + {"$set": {new_key: compiled_data}}, upsert=True ) await user_db.user_profile.update_one( - { - "user_id": user["user_id"], - }, - { - "$set": { - f"level.{level_key}": level - } - }, + {"user_id": user["user_id"]}, + {"$set": {f"level.{level_key}": level}}, upsert=True ) - return Response(status_code=204) \ No newline at end of file + return Response(status_code=204) diff --git a/app/routers/pre_assessment_router.py b/app/routers/pre_assessment_router.py index 1d881a5..2fae9d6 100644 --- a/app/routers/pre_assessment_router.py +++ b/app/routers/pre_assessment_router.py @@ -1,37 +1,40 @@ import logging import sys -from fastapi import APIRouter, Response -from app.clients import db_clients +from fastapi import APIRouter, Response, HTTPException +from typing import List +from app.clients import db_clients from app.models.pre_assessment.request import AssessmentResult from app.models.pre_assessment.response import QuestionStructure from app.services.assessment.common import safe_sample, result_generate from app.services.assessment.pre import level_to_string from app.services.common.common import subject_id_to_name, get_user -from typing import List from app.utils.embed import embed_to_chroma - router = APIRouter() logger = logging.getLogger("pre-assessment-router") logger.setLevel(logging.INFO) -question_db = db_clients["ai_platform"] -assessment_db = db_clients["assessment"] -user_db = db_clients["user"] - if not logger.handlers: - handler = logging.StreamHandler(sys.stdout) # 터미널로 출력 + handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter('[%(levelname)s] %(name)s: %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) -@router.get("/subject", response_model=List[QuestionStructure], response_model_by_alias=False, summary="사전 평가 문제를 생성", description="데이터베이스에서 사전에 지정된 규칙에 따라 저장된 문제를 가져오고, 사전 평가 문제 데이터셋을 완성한다.") +# DB 클라이언트 정의 +question_db = db_clients["ai_platform"] +assessment_db = db_clients["assessment"] +user_db = db_clients["user"] + + +@router.get("/subject", response_model=List[QuestionStructure], response_model_by_alias=False, + summary="사전 평가 문제를 생성", description="데이터베이스에서 사전에 지정된 규칙에 따라 저장된 문제를 가져오고, 사전 평가 문제 데이터셋을 완성한다.") async def get_pretest(user_id: str, subject_id: int): subject_name = await subject_id_to_name(subject_id) + # ✅ 과목별 컬렉션에서 문제 조회 (예: ai_platform.math) all_questions = await question_db.db[subject_name].find().to_list(length=1000) chapter_names = {q["chapterName"] for q in all_questions} @@ -47,7 +50,7 @@ async def get_pretest(user_id: str, subject_id: int): return result -@router.post('/subject', summary="사용자의 사전 평가 결과를 저장", description="백엔드 서버에서 전송된 사용자의 사전 평가 결과를 데이터베이스에 저장한다.") +@router.post("/subject", summary="사용자의 사전 평가 결과를 저장", description="백엔드 서버에서 전송된 사용자의 사전 평가 결과를 데이터베이스에 저장한다.") async def save_result(user_id: str, payload: AssessmentResult): user = await get_user(user_id) compiled_data = payload.model_dump(exclude={"userId"}) @@ -55,7 +58,7 @@ async def save_result(user_id: str, payload: AssessmentResult): level = await level_to_string(payload.subject.level) level_key = str(subject_id) - # Chroma 자동 삽입 + # ✅ Chroma 자동 임베딩 for ch in compiled_data.get("chapters", []): content = ch.get("userAnswer", "") qid = ch.get("questionId") @@ -76,7 +79,7 @@ async def save_result(user_id: str, payload: AssessmentResult): await user_db.user_profile.update_one( { - "user_id": user["user_id"], + "user_id": user["user_id"] }, { "$set": { diff --git a/app/routers/question_router.py b/app/routers/question_router.py index a8c7d7f..4f8b93a 100644 --- a/app/routers/question_router.py +++ b/app/routers/question_router.py @@ -2,16 +2,14 @@ import openai from fastapi import APIRouter, HTTPException, Query, Body from typing import List -from app.clients import interview_client -from app.clients import db_clients + +from app.clients import db_clients # ✅ 수정: interview_client 제거 from app.models.interview.question_model import InterviewQuestion +from app.models.interview.evaluation_model import EvaluationRequest from app.services.common.common import subject_id_to_name from app.services.interview.bulider import get_questions_by_sub_id -from app.models.interview.evaluation_model import EvaluationRequest from app.services.interview.evaluator import evaluate_answer_with_rag from app.utils.embed import embed_to_chroma -from bson import ObjectId - router = APIRouter(tags=["인터뷰 면접 기능 관련 API"]) @@ -33,6 +31,7 @@ async def get_questions( raise HTTPException(status_code=500, detail=f"질문 검색 중 오류 발생: {str(e)}") +# 면접 평가 및 저장 API @router.post("/evaluate", summary="면접 답변 평가 및 저장 (복수 개)", response_model=List[dict]) async def evaluate_with_rag_and_embed( user_id: str = Query(..., description="사용자 ID"), @@ -57,7 +56,7 @@ async def evaluate_with_rag_and_embed( source_id=str(request.interviewId) ) - # 3. MongoDB 저장 (기록은 유지) + # 3. MongoDB 저장 (기록 유지) doc = { "user_id": user_id, "interview_id": request.interviewId, @@ -65,14 +64,12 @@ async def evaluate_with_rag_and_embed( "user_reply": request.userReply, "evaluation": evaluation_result } - await interview_client.interview_feedback.insert_one(doc) + await db_clients.interview_feedback.insert_one(doc) # ✅ 수정된 부분 - # 4. evaluation만 리스트로 추가 + # 4. 평가 결과 리스트에 추가 evaluations.append(evaluation_result) - # 전체 문서 대신 evaluation 리스트만 반환 return evaluations except Exception as e: raise HTTPException(status_code=500, detail=f"오류 발생: {str(e)}") - diff --git a/app/routers/recommendation_router.py b/app/routers/recommendation_router.py index 5c855f4..d6f72a3 100644 --- a/app/routers/recommendation_router.py +++ b/app/routers/recommendation_router.py @@ -1,9 +1,9 @@ # app/routers/recommendation_router.py 수정 예시 import asyncio +from datetime import datetime +from typing import List from fastapi import APIRouter, HTTPException -from typing import List -from datetime import datetime from app.clients import chroma_client, db_clients from app.models.recommendation.request import UserPreference @@ -16,27 +16,30 @@ router = APIRouter() vectordb = chroma_client -recommendation_db = db_clients["recommendation"] +recommendation_collection = db_clients.recommendation_content # ✅ 수정 +cache_collection = db_clients.recommendation_cache # ✅ 수정 -recommendation_collection = recommendation_db.recommendation_content -cache_collection = recommendation_db.recommendation_cache -@router.post("", response_model=List[RecommendationResponse], summary="개인화 콘텐츠 추천 API", description="사전/사후 평가 및 진단 기반으로 사용자의 맥락에 맞는 콘텐츠 4개와 AI픽 1개를 제공합니다.") +@router.post("", response_model=List[RecommendationResponse], + summary="개인화 콘텐츠 추천 API", + description="사전/사후 평가 및 진단 기반으로 사용자의 맥락에 맞는 콘텐츠 4개와 AI픽 1개를 제공합니다.") async def recommend_content(user_id: str, subject_id: int): - user = await get_user(user_id) - subject = await subject_id_to_name(subject_id) - all_levels = user.get("level", {}) - - raw_prefs = user.get("preferences", {}) - prefs = UserPreference( - level=all_levels.get(str(subject_id), "Not_Defined"), - duration=raw_prefs.get("duration", 0), - price=raw_prefs.get("price", 0), - is_prefer_book=raw_prefs.get("is_prefer_book", False), - ) - try: + user = await get_user(user_id) + subject = await subject_id_to_name(subject_id) + all_levels = user.get("level", {}) + + raw_prefs = user.get("preferences", {}) + prefs = UserPreference( + level=all_levels.get(str(subject_id), "Not_Defined"), + duration=raw_prefs.get("duration", 0), + price=raw_prefs.get("price", 0), + is_prefer_book=raw_prefs.get("is_prefer_book", False), + ) + content_types = ["책"] if prefs.is_prefer_book else ["동영상", "블로그"] + + # 콘텐츠 후보 가져오기 candidates = await recommendation_collection.find({ "sub_id": subject_id, "content_type": {"$in": content_types}, @@ -73,7 +76,7 @@ async def recommend_content(user_id: str, subject_id: int): "cached_at": datetime.utcnow() }) - # Chroma 삽입 + # Chroma 임베딩 content_text = f"{item['content_title']} {item['content_platform']} {item['content_type']}" embed_to_chroma(user_id=user_id, content=content_text, source="recommendation", source_id=str(item["_id"])) @@ -93,14 +96,16 @@ async def recommend_content(user_id: str, subject_id: int): "comment": reason }) + # AI픽 선정 best_index = call_gpt_rerank(content_for_gpt, context_str) if 0 <= best_index < len(results): results[best_index]["isAiRecommendation"] = True + # 캐시 저장 if log: await cache_collection.insert_many(log) return results except Exception as e: - raise HTTPException(status_code=500, detail=f"Error during recommendation: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error during recommendation: {str(e)}") \ No newline at end of file diff --git a/app/tasks/feedback_task.py b/app/tasks/feedback_task.py index 5f73a4b..a8cfbd3 100644 --- a/app/tasks/feedback_task.py +++ b/app/tasks/feedback_task.py @@ -1,12 +1,52 @@ # app/tasks/feedback_task.py from celery import shared_task +from datetime import datetime +from pymongo import MongoClient +from opensearchpy.helpers import bulk +from app.clients.opensearch_client import opensearch_client from app.utils.embed import embed_to_chroma +# ────────────────────────────────────────────── +# 1) Chroma 삽입용 태스크 +# ────────────────────────────────────────────── @shared_task(name="sync_feedback_to_chroma") def sync_feedback_to_chroma_task(user_id: str, content: str, source_id: str): + """사용자 피드백을 임베딩 후 ChromaDB에 저장""" return embed_to_chroma( user_id=user_id, content=content, source="feedback", - source_id=source_id + source_id=source_id, ) + +# ────────────────────────────────────────────── +# 2) OpenSearch 로그 적재 태스크 (동기 PyMongo 버전) +# ────────────────────────────────────────────── +mongo = MongoClient("mongodb://localhost:27017") +collection = mongo["feedback"]["feedback_content"] + +@shared_task(name="sync_feedback_logs_to_opensearch") +def sync_feedback_logs_to_opensearch(): + """feedback_content → OpenSearch 인덱스(feedback_logs)""" + actions = [ + { + "_index": "feedback_logs", + "_id": str(doc["_id"]), + "_source": { + "user_id": doc.get("user_id"), + "feedback_type": doc.get("feedback_type", "post"), + "subject": doc.get("subject"), + "final_comment": doc.get("final_comment"), + "score": doc.get("score", 0), + "timestamp": doc.get("timestamp", datetime.utcnow()), + "embedding": doc.get("embedding", [0.0] * 768), + }, + } + for doc in collection.find({}) + ] + + if actions: + success, _ = bulk(opensearch_client, actions, request_timeout=60) + print(f"[feedback_logs] {success}건 전송 완료") + else: + print("[feedback_logs] 전송할 문서가 없습니다.") diff --git a/app/tasks/migrate_task.py b/app/tasks/migrate_task.py index e52d2ab..66687aa 100644 --- a/app/tasks/migrate_task.py +++ b/app/tasks/migrate_task.py @@ -1,71 +1,47 @@ # app/tasks/migrate_task.py from celery import shared_task from pymongo import MongoClient -from datetime import datetime from dotenv import load_dotenv -from app.clients.chromadb_client import ChromaClient from langchain_core.documents import Document -from opensearchpy.helpers import bulk -from app.clients.opensearch_client import opensearch_client +from app.clients.chromadb_client import ChromaClient import os load_dotenv() + mongo = MongoClient(os.getenv("MONGO_DB_URL")) collection = mongo["ai_interview"]["interview_contents"] chroma_client = ChromaClient() @shared_task def batch_migrate_to_chroma(): + """MongoDB → ChromaDB 전체 마이그레이션""" user_ids = collection.distinct("user_id") total = 0 + for user_id in user_ids: docs = collection.find({"user_id": user_id}) langchain_docs = [] + for doc in docs: content = doc.get("content", "") if not content: continue metadata = { - "user_id": user_id, + "user_id": user_id, "source_id": str(doc["_id"]), - "source": doc.get("source", "interview") + "source": doc.get("source", "interview"), } - existing = chroma_client.collection.get(where={"source_id": metadata["source_id"]}) - if existing.get("ids"): + # 이미 들어간 문서는 건너뜀 + if chroma_client.collection.get(where={"source_id": metadata["source_id"]}).get("ids"): continue langchain_docs.append(Document(page_content=content, metadata=metadata)) if langchain_docs: chroma_client.add_documents(langchain_docs) - print(f" user_id={user_id}: {len(langchain_docs)}개 삽입") + print(f"user_id={user_id}: {len(langchain_docs)}개 삽입") total += len(langchain_docs) - print(f" Celery Batch 삽입 완료: 총 {total}개 문서") - -@shared_task -def sync_feedback_logs_to_opensearch(): - import asyncio - actions = [] - cursor = collection.find({}) - for doc in cursor: - actions.append({ - "_index": "feedback_logs", - "_id": str(doc["_id"]), - "_source": { - "user_id": doc.get("user_id"), - "feedback_type": doc.get("feedback_type", "post"), - "subject": doc.get("subject"), - "final_comment": doc.get("final_comment"), - "score": doc.get("score", 0), - "timestamp": doc.get("timestamp", datetime.utcnow()), - "embedding": doc.get("embedding", [0.0] * 768) - } - }) - if actions: - success, _ = bulk(opensearch_client, actions) - print(f"OpenSearch 전송 완료: {success}개 문서") - else: - print("MongoDB에서 읽은 문서가 없습니다!") + print(f"Celery Batch 삽입 완료: 총 {total}개 문서") diff --git a/app/tasks/opensearch_sync.py b/app/tasks/opensearch_sync.py deleted file mode 100644 index f716059..0000000 --- a/app/tasks/opensearch_sync.py +++ /dev/null @@ -1,38 +0,0 @@ -from celery import shared_task -from opensearchpy.helpers import bulk -from datetime import datetime -from app.clients.opensearch_client import opensearch_client # ← 여기! -from app.clients.mongodb import db_client # ← 여기! - -mongo_client = db_client() -collection = mongo_client.feedback # ai_platform.feedback 컬렉션 - -@shared_task -def sync_feedback_logs_to_opensearch(): - import asyncio - - actions = [] - - async def prepare_docs(): - cursor = collection.find({}) - async for doc in cursor: - actions.append({ - "_index": "feedback_logs", - "_id": str(doc["_id"]), - "_source": { - "user_id": doc.get("user_id"), - "feedback_type": doc.get("feedback_type", "post"), - "subject": doc.get("subject"), - "final_comment": doc.get("final_comment"), - "score": doc.get("score", 0), - "timestamp": doc.get("timestamp", datetime.utcnow()), - "embedding": doc.get("embedding", [0.0] * 768) - } - }) - asyncio.run(prepare_docs()) - - if actions: - success, _ = bulk(opensearch_client, actions) - print(f"OpenSearch 전송 완료: {success}개 문서") - else: - print("MongoDB에서 읽은 문서가 없습니다!") diff --git a/app/tasks/opensearch_task.py b/app/tasks/opensearch_task.py new file mode 100644 index 0000000..4480ae0 --- /dev/null +++ b/app/tasks/opensearch_task.py @@ -0,0 +1,49 @@ +# app/tasks/opensearch_task.py +from celery import shared_task +from datetime import datetime +from pymongo import MongoClient +from opensearchpy.helpers import bulk +from dateutil.parser import parse as parse_date +from app.clients.opensearch_client import get_opensearch_client +opensearch_client = get_opensearch_client()# PyMongo 동기 래퍼 + +mongo = MongoClient("mongodb://54.180.4.224:27017") + +COLLECTION_MAPPINGS = { + "interview_logs": mongo["ai_interview"]["interview_contents"], + "recommendation_logs": mongo["ai_platform"]["recommend_cache"], + "feedback_logs": mongo["feedback"]["feedback_content"], + "startup_logs": mongo["local"]["startup_log"], + "pre_result_logs": mongo["assessment"]["pre_result"], + "post_result_logs": mongo["assessment"]["post_result"], +} + +@shared_task(name="sync_all_logs_to_opensearch") +def sync_all_logs_to_opensearch(): + for index_name, collection in COLLECTION_MAPPINGS.items(): + actions = [] + for doc in collection.find({}): + timestamp = doc.get("timestamp") + if isinstance(timestamp, str): + try: + timestamp = parse_date(timestamp) + except Exception: + timestamp = datetime.utcnow() + elif not timestamp: + timestamp = datetime.utcnow() + + action = { + "_index": index_name, + "_id": str(doc["_id"]), + "_source": { + **{k: v for k, v in doc.items() if k != "_id"}, + "timestamp": timestamp + } + } + actions.append(action) + + if actions: + success, _ = bulk(opensearch_client, actions, request_timeout=60) + print(f"[{index_name}] {success}건 전송 완료") + else: + print(f"[{index_name}] 전송할 문서 없음") diff --git a/app/utils/build_interview_log.py b/app/utils/build_interview_log.py new file mode 100644 index 0000000..3313f74 --- /dev/null +++ b/app/utils/build_interview_log.py @@ -0,0 +1,17 @@ +from datetime import datetime + +def build_interview_log(user_id: str, question_id: str, question: str, answer: str, evaluation: dict): + return { + "user_id": user_id, + "question_id": question_id, + "question": question, + "answer": answer, + "evaluation": { + "logic": evaluation.get("logic", 0), + "accuracy": evaluation.get("accuracy", 0), + "clarity": evaluation.get("clarity", 0), + "terms": evaluation.get("terms", 0), + "overall_comment": evaluation.get("overall_comment", "") + }, + "timestamp": datetime.now().isoformat() + } diff --git a/celerybeat-schedule.bak b/celerybeat-schedule.bak index b79e0c8..7241433 100644 --- a/celerybeat-schedule.bak +++ b/celerybeat-schedule.bak @@ -1,4 +1,4 @@ -'entries', (0, 462) -'__version__', (512, 20) -'tz', (1024, 25) -'utc_enabled', (1536, 4) +'entries', (4096, 571) +'__version__', (2560, 20) +'tz', (3072, 4) +'utc_enabled', (3584, 4) diff --git a/celerybeat-schedule.dat b/celerybeat-schedule.dat index e0a3056..846ffaf 100644 Binary files a/celerybeat-schedule.dat and b/celerybeat-schedule.dat differ diff --git a/celerybeat-schedule.dir b/celerybeat-schedule.dir index b79e0c8..7241433 100644 --- a/celerybeat-schedule.dir +++ b/celerybeat-schedule.dir @@ -1,4 +1,4 @@ -'entries', (0, 462) -'__version__', (512, 20) -'tz', (1024, 25) -'utc_enabled', (1536, 4) +'entries', (4096, 571) +'__version__', (2560, 20) +'tz', (3072, 4) +'utc_enabled', (3584, 4) diff --git a/create_index.py b/create_index.py deleted file mode 100644 index 2cf6c6c..0000000 --- a/create_index.py +++ /dev/null @@ -1,29 +0,0 @@ -from opensearchpy import OpenSearch - -client = OpenSearch("http://localhost:9200") - -index_name = "feedback_logs" - -mapping = { - "mappings": { - "properties": { - "user_id": {"type": "keyword"}, - "feedback_type": {"type": "keyword"}, - "subject": {"type": "keyword"}, - "final_comment": {"type": "text"}, # <- 수정됨 - "score": {"type": "integer"}, - "embedding": { - "type": "knn_vector", - "dimension": 768 - }, - "timestamp": {"type": "date"} - } - } -} - -# 인덱스가 이미 있으면 삭제 후 생성 -if client.indices.exists(index=index_name): - client.indices.delete(index=index_name) - -response = client.indices.create(index=index_name, body=mapping) -print("Index created:", response) diff --git a/pyproject.toml b/pyproject.toml index a9b395b..03b1d61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,6 @@ markdown-it-py = "3.0.0" marshmallow = "3.26.1" mdurl = "0.1.2" mmh3 = "5.1.0" -motor = "3.7.0" mpmath = "1.3.0" multidict = "6.4.3" mypy_extensions = "1.1.0" diff --git a/requirements.txt b/requirements.txt index 9b9b9e4..c45b740 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ hypercorn -motor pymongo opensearch-py celery