Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions app/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,30 @@
backend=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
include=[
"app.tasks.migrate_task",
"app.tasks.feedback_task"
]
"app.tasks.feedback_task",
"app.tasks.opensearch_task",
],
)

celery_app.conf.timezone = "Asia/Seoul"
# 모듈을 미리 import 해 두면 워커 기동 시 태스크가 즉시 등록됩니다.
import app.tasks.migrate_task
import app.tasks.feedback_task
import app.tasks.opensearch_task

celery_app.conf.beat_schedule = {
# 매일 03:00 MongoDB → Chroma 마이그레이션
"daily-migrate-to-chroma": {
"task": "app.tasks.migrate_task.batch_migrate_to_chroma",
"schedule": crontab(hour=3, minute=0),
},
# 매일 04:00 Feedback → OpenSearch 전송 (경로 수정!)
"daily-sync-feedback-to-opensearch": {
"task": "app.tasks.migrate_task.sync_feedback_logs_to_opensearch",
"task": "app.tasks.feedback_task.sync_feedback_logs_to_opensearch",
"schedule": crontab(hour=4, minute=0),
},
# 매일 05:00 모든 로그 컬렉션 → OpenSearch
"daily-sync-opensearch-logs": {
"task": "sync_all_logs_to_opensearch",
"schedule": crontab(hour=5, minute=0),
},
}
21 changes: 4 additions & 17 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
from .chromadb_client import ChromaClient
from .mongodb import MongoDBClient
from .mongodb import MongoClientsManager # ✅ MongoClientsManager만 사용
from .openai_client import OpenAiClient


# 클라이언트 초기화
ai_client = OpenAiClient()

chroma_client = ChromaClient(
persist_directory="chroma_store/recommend_contents",
openai_api_key=None
)

question_client = MongoDBClient()
assessment_client = MongoDBClient(db_name="assessment")
feedback_client = MongoDBClient(db_name="feedback")
recommendation_client = MongoDBClient(db_name="recommendation")
user_client = MongoDBClient(db_name="user")
interview_client = MongoDBClient(db_name="ai_interview")

db_clients = {
"ai_platform": question_client,
"assessment": assessment_client,
"feedback": feedback_client,
"recommendation": recommendation_client,
"user": user_client,
"ai_interview" : interview_client,
}
# MongoClientsManager 사용
db_clients = MongoClientsManager()
7 changes: 3 additions & 4 deletions app/clients/chromadb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def count_documents(self):
return self.collection.count()

def get_documents_by_user(self, user_id: str, limit: int = 5):
return self.collection.get(where={"user_id": user_id})
return self.collection.get(where={"user_id": user_id}, limit=limit)

def delete_documents(self, user_id: str = None, source: str = None, limit: Optional[int] = None, sort_order: Optional[str] = "asc"):
filter_ = []
Expand All @@ -52,7 +52,8 @@ def delete_documents(self, user_id: str = None, source: str = None, limit: Optio
print("\n삭제 조건이 없습니다.")
return

where_clause = {"$and": filter_} if len(filter_) > 1 else filter_[0]
# 수정: Mongo 스타일 → Chroma 스타일 where_clause로 변환
where_clause = {k: v for d in filter_ for k, v in d.items()}

if limit:
docs = self.collection.get(where=where_clause)
Expand Down Expand Up @@ -86,7 +87,6 @@ def delete_all_documents(self, batch_size: int = 500):
total = len(all_ids)
print(f"\n[전체 삭제] 총 {total}개 문서 삭제 시작")

# Batch로 나누기
for i in range(0, total, batch_size):
batch_ids = all_ids[i:i + batch_size]
result = self.collection.delete(ids=batch_ids)
Expand All @@ -96,7 +96,6 @@ def delete_all_documents(self, batch_size: int = 500):
return total



if __name__ == "__main__":
chroma = ChromaClient()
chroma.delete_all_documents()
73 changes: 61 additions & 12 deletions app/clients/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,81 @@ def __init__(self, mongo_url: str | None = None, db_name: str = "ai_platform"):
self.client = AsyncIOMotorClient(url)
self.db = self.client[db_name]

# db_name = user
# 수정된 부분: .get → [] 방식으로 변경
self.user_profile = self.db["user_profile"]

# db_name = recommendation
self.recommendation_content = self.db["recommendation_content"]
self.recommendation_cache = self.db["recommendation_cache"]

# db_name = feedback
self.feedback = self.db["feedback"]

# db_name = assessment
self.pre_result = self.db["pre_result"]
self.post_result = self.db["post_result"]

# db_name = ai_platform
self.techMap = self.db["techMap"]

self.questions = self.db["questions"]
self.techMap = self.db["techMap"]
self.startup_log = self.db["startup_log"]
self.interview_feedback = self.db["interview_feedback"]
self.interview_contents = self.db["interview_contents"]

def get_category_collection(self, category: str):
return self.db[category]

async def save_explanation_log(self, log_data: dict):
log_data["created_at"] = datetime.utcnow()
await self.explanation_log.insert_one(log_data)
await self.db["explanation_log"].insert_one(log_data)


class MongoClientsManager:
def __init__(self):
self._clients = {
"ai_platform": MongoDBClient(db_name="ai_platform"),
"assessment": MongoDBClient(db_name="assessment"),
"feedback": MongoDBClient(db_name="feedback"),
"recommendation": MongoDBClient(db_name="recommendation"),
"user": MongoDBClient(db_name="ai_platform"), # user_profile 포함
"ai_interview": MongoDBClient(db_name="ai_interview"),
}

def __getitem__(self, key):
return self._clients[key]

@property
def user_profile(self):
return self._clients["user"].user_profile

@property
def recommendation_content(self):
return self._clients["recommendation"].recommendation_content

@property
def recommendation_cache(self):
return self._clients["recommendation"].recommendation_cache

@property
def feedback(self):
return self._clients["feedback"].feedback

@property
def pre_result(self):
return self._clients["assessment"].pre_result

@property
def post_result(self):
return self._clients["assessment"].post_result

@property
def questions(self):
return self._clients["ai_platform"].questions

@property
def techMap(self):
return self._clients["ai_platform"].techMap

@property
def startup_log(self):
return self._clients["ai_platform"].startup_log

@property
def interview_feedback(self):
return self._clients["ai_interview"].interview_feedback

@property
def interview_contents(self):
return self._clients["ai_interview"].interview_contents
17 changes: 11 additions & 6 deletions app/clients/opensearch_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# app/clients/opensearch_client.py
from opensearchpy import OpenSearch
import os

opensearch_client = OpenSearch(
hosts=[os.getenv("OPENSEARCH_HOST", "http://localhost:9200")],
http_auth=(os.getenv("OPENSEARCH_USER", "admin"), os.getenv("OPENSEARCH_PASSWORD", "admin")),
use_ssl=False,
verify_certs=False
)
def get_opensearch_client():
return OpenSearch(
hosts=[{"host": os.getenv("OPENSEARCH_HOST", "localhost"), "port": int(os.getenv("OPENSEARCH_PORT", 9200))}],
http_auth=(os.getenv("OPENSEARCH_USER", "admin"), os.getenv("OPENSEARCH_PASS", "admin")),
use_ssl=False,
verify_certs=False
)

# 기본 객체도 생성해둠 (기본 사용 용도)
opensearch_client = get_opensearch_client()
95 changes: 95 additions & 0 deletions app/create_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from opensearchpy import OpenSearch

# OpenSearch 연결 설정
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
http_auth=("admin", "admin"),
use_ssl=False,
verify_certs=False
)

# 인덱스 매핑 정의
def get_index_mappings():
return {
"feedback_logs": {
"properties": {
"user_id": {"type": "keyword"},
"feedback_type": {"type": "keyword"},
"subject": {"type": "keyword"},
"final_comment": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"}
}
},
"score": {"type": "integer"},
"embedding": {
"type": "knn_vector",
"dimension": 768
},
"timestamp": {"type": "date"}
}
},
"interview_logs": {
"properties": {
"user_id": {"type": "keyword"},
"question_id": {"type": "keyword"},
"question": {"type": "text"},
"answer": {"type": "text"},
"logic": {"type": "integer"},
"accuracy": {"type": "integer"},
"clarity": {"type": "integer"},
"terms": {"type": "integer"},
"overall_comment": {
"type": "text",
"fields": {"keyword": {"type": "keyword"}}
},
"timestamp": {"type": "date"}
}
},
"recommendation_logs": {
"properties": {
"user_id": {"type": "keyword"},
"title": {
"type": "text",
"fields": {"keyword": {"type": "keyword"}}
},
"user_context": {"type": "text"},
"timestamp": {"type": "date"}
}
},
"startup_logs": {
"properties": {
"service": {"type": "keyword"},
"message": {"type": "text"},
"timestamp": {"type": "date"}
}
},
"pre_result_logs": {
"properties": {
"user_id": {"type": "keyword"},
"subject": {"type": "keyword"},
"score": {"type": "integer"},
"level": {"type": "keyword"},
"timestamp": {"type": "date"}
}
},
"post_result_logs": {
"properties": {
"user_id": {"type": "keyword"},
"subject": {"type": "keyword"},
"score": {"type": "integer"},
"level": {"type": "keyword"},
"timestamp": {"type": "date"}
}
}
}

# 인덱스 재생성
index_mappings = get_index_mappings()
for index_name, mapping in index_mappings.items():
if client.indices.exists(index=index_name):
client.indices.delete(index=index_name)
print(f"🗑️ Deleted index: {index_name}")
client.indices.create(index=index_name, body={"mappings": mapping})
print(f"✅ Created index: {index_name}")
46 changes: 46 additions & 0 deletions app/insert_feedback_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime
from opensearchpy import OpenSearch
import random

client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
http_auth=("admin", "admin"),
use_ssl=False,
verify_certs=False
)

user_ids = [f"user_{i}" for i in range(1, 8)]
subjects = ["math", "cs", "physics", "bio"]
feedback_types = ["PRE", "POST"]
comments = [
"개념 이해가 잘 되어 있습니다.",
"약간의 복습이 필요합니다.",
"용어 사용이 매우 정확합니다.",
"응답이 다소 모호했습니다.",
"전반적으로 향상된 모습입니다.",
"핵심 개념에 대한 이해가 부족합니다.",
"예시 사용이 효과적이었습니다.",
"개념 연결 능력이 향상되었습니다."
]

docs = []

for uid in user_ids:
for _ in range(3): # 각 사용자당 피드백 3개
doc = {
"user_id": uid,
"subject": random.choice(subjects),
"feedback_type": random.choice(feedback_types),
"score": random.randint(60, 100),
"final_comment": random.choice(comments),
"timestamp": datetime.utcnow().isoformat()
}
docs.append({"index": {"_index": "feedback_logs"}})
docs.append(doc)

response = client.bulk(body=docs)

if not response["errors"]:
print("✅ 피드백 데이터 삽입 완료")
else:
print("❌ 삽입 중 오류 발생:", response)
36 changes: 36 additions & 0 deletions app/insert_interview_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from datetime import datetime
from opensearchpy import OpenSearch
import random

client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
http_auth=("admin", "admin"),
use_ssl=False,
)

user_ids = [f"user_{i}" for i in range(1, 8)]
questions = [
("Q1", "Explain the concept of recursion."),
("Q2", "What is normalization in databases?"),
("Q3", "Define polymorphism in OOP.")
]

for user in user_ids:
qid, question = random.choice(questions)
doc = {
"user_id": user,
"question_id": qid,
"question": question,
"answer": "Sample answer...",
"evaluation": {
"logic": random.randint(1, 5),
"accuracy": random.randint(1, 5),
"clarity": random.randint(1, 5),
"terms": random.randint(1, 5),
"overall_comment": "Well explained" if random.random() > 0.5 else "Needs improvement"
},
"timestamp": datetime.utcnow().isoformat()
}
client.index(index="interview_logs", body=doc)

print("✅ 삽입 완료: interview_logs")
Loading