Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ReactionMiner backend job, skip duplicate emails, fix casing for novoStoic in emails #71

Merged
merged 22 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c1ba9dc
chore: update pvc-explorer to mount the full NFS share for MMLI
bodom0015 Jan 22, 2025
a0aa133
feat: first-pass at adding ReactionMiner to mmli-backend
bodom0015 Jan 23, 2025
30837ee
fix: casing in novoStoic emails, attempt at skipping sending duplicat…
bodom0015 Jan 23, 2025
13ceda9
feat: support for secrets in kubejob_service, include env for Reactio…
bodom0015 Jan 23, 2025
b07258a
fix: syntax error in kubejob_service
bodom0015 Jan 23, 2025
89a114a
feat: include huggingface-token SealedSecret for staging/prod, add co…
bodom0015 Jan 23, 2025
e36c5df
fix: experiment with reactionminer output as a directory
bodom0015 Jan 23, 2025
dffebe9
fix: print stacktrace when catching generic Exception while calling k…
bodom0015 Jan 23, 2025
b14e95b
fix: workaround for bug with iterating over built-in function names (…
bodom0015 Jan 23, 2025
e7d8d0b
fix: change name of "items" to avoid conflict with built-in .items fu…
bodom0015 Jan 24, 2025
5f54177
fix: consistency is important
bodom0015 Jan 24, 2025
9064a27
fix: one more try at handling secrets[].items (renamed to itemList)
bodom0015 Jan 24, 2025
1a0ef5f
fix: fixed typos
bodom0015 Jan 24, 2025
c532c50
fix: place secrets at the end of volume section, instead of interming…
bodom0015 Jan 24, 2025
76ea2d0
fix: typo - sec->sec.name
bodom0015 Jan 24, 2025
e15b01e
feat: attempt to iterate over all result files and return them
bodom0015 Jan 24, 2025
4d491d9
fix: fixed syntax/loop for getting result files from ReactionMiner
bodom0015 Jan 24, 2025
09f6795
fix: remove bad log statement
bodom0015 Jan 24, 2025
d2dbd7e
feat: support uploading file_contents as string to MinIO
bodom0015 Jan 24, 2025
3c45e97
fix: add coments, remove unused code
bodom0015 Jan 24, 2025
f73b0e4
fix: use bytes, not byte stream
bodom0015 Jan 24, 2025
71b7f28
chore: slight refactoring, handle case of invalid novoStoic subjob
bodom0015 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions app/cfg/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,47 @@ kubernetes_jobs:
subPath: 'weights'
claimName: 'mmli-clean-job-weights'

# Config for running ReactionMiner job
reactionminer:
image: "moleculemaker/reactionminer:latest"
ckouder marked this conversation as resolved.
Show resolved Hide resolved
imagePullPolicy: "Always"
env:
- name: HF_TOKEN_PATH
value: '/root/.cache/token'
- name: GROBID_SERVER
value: 'localhost'
volumes:
- name: 'shared-storage'
mountPath: '/workspace/10test/'
subPath: 'uws/jobs/reactionminer/JOB_ID/in'
claimName: 'mmli-clean-job-weights'
- name: 'shared-storage'
mountPath: '/workspace/extraction/results_filtered/'
subPath: 'uws/jobs/reactionminer/JOB_ID/out'
claimName: 'mmli-clean-job-weights'
- mountPath: '/root/.cache/huggingface/hub'
name: 'shared-storage'
subPath: 'uws/jobs/reactionminer/.cache'
claimName: 'mmli-clean-job-weights'

# Create a secret named HuggingFace API token
# Format:
# apiVersion: v1
# kind: Secret
# data:
# token: YOUR_API_TOKEN
secrets:
- mountPath: '/root/.cache/token'
name: 'token'
subPath: 'token'
readOnly: 'true'
secretName: huggingface-token
# "itemList" takes the place of "items"
# This should avoid overlap with the built-in items() function
itemList:
- key: token
path: token

# Config for running SOMN job
somn:
image: "ianrinehart/somn:1.1"
Expand Down
1 change: 1 addition & 0 deletions app/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class JobType(str, Enum):
NOVOSTOIC_PATHWAYS = 'novostoic-pathways'
NOVOSTOIC_ENZRANK = 'novostoic-enzrank'
NOVOSTOIC_DGPREDICTOR = 'novostoic-dgpredictor'
REACTIONMINER = 'reactionminer'
SOMN = 'somn'
DEFAULT = 'defaults'

Expand Down
4 changes: 4 additions & 0 deletions app/routers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from services.minio_service import MinIOService
from services.chemscraper_service import ChemScraperService
from services.aceretro_service import ACERetroService
from services.reactionminer_service import ReactionMinerService


from typing import Optional
Expand Down Expand Up @@ -83,6 +84,9 @@ async def get_results(bucket_name: str, job_id: str, service: MinIOService = Dep
print("Getting novostoic-dgpredictor job result")
return await NovostoicService.dgPredictorResultPostProcess(bucket_name, job_id, service, db)

elif bucket_name == JobType.REACTIONMINER:
return await ReactionMinerService.resultPostProcess(bucket_name, job_id, service, db)

elif bucket_name == JobType.SOMN:
return await SomnService.resultPostProcess(bucket_name, job_id, service, db)

Expand Down
10 changes: 9 additions & 1 deletion app/routers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid
import csv
import io
import traceback

from typing import List

Expand Down Expand Up @@ -71,6 +72,10 @@ async def create_job(
command = ''
environment = []

# Mount in secrets/volumes at runtime
volumes = []
secrets = []

if job_type == JobType.DEFAULT:
command = app_config['kubernetes_jobs'][job_type]['command']
#command = f'ls -al /uws/jobs/{job_type}/{job_id}'
Expand Down Expand Up @@ -115,7 +120,9 @@ async def create_job(

command = f"python entrypoint.py --job_id {job_id}"
# Job is created at end of function

elif job_type == JobType.REACTIONMINER:
log.debug(f'Running ReactionMiner job: {job_id}')
environment = app_config['kubernetes_jobs']['reactionminer']['env']
elif job_type == JobType.SOMN:
# Build up example_request.csv from user input, upload to MinIO?
job_config = json.loads(job_info.replace('\"', '"'))
Expand Down Expand Up @@ -231,6 +238,7 @@ async def create_job(
kubejob_service.create_job(job_type=job_type, job_id=job_id, run_id=run_id, image_name=image_name, command=command, environment=environment)
except Exception as ex:
log.error("Failed to create Job: " + str(ex))
log.error(traceback.format_exc())
raise HTTPException(status_code=400, detail="Failed to create Job: " + str(ex))

else:
Expand Down
51 changes: 45 additions & 6 deletions app/services/kubejob_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sqlalchemy as db

from services.email_service import EmailService
from services.minio_service import MinIOService

log = get_logger(__name__)

Expand Down Expand Up @@ -118,7 +119,9 @@ def __init__(self):
#self.connection.run_sync(SQLModel.metadata.create_all)
self.metadata = db.MetaData()
self.jobs = []

self.email_service = EmailService()
self.minio_service = MinIOService()

self.stream = None
self.logger.info('Starting KubeWatcher')
Expand All @@ -132,7 +135,7 @@ def send_notification_email(self, updated_job, new_phase):
novostoic_frontend_url = app_config['novostoic_frontend_url']
if job_type == JobType.NOVOSTOIC_PATHWAYS:
results_url = f'{novostoic_frontend_url}/pathway-search/result/{updated_job.job_id}'
job_type_name = 'NovoStoic'
job_type_name = 'novoStoic'
elif job_type == JobType.NOVOSTOIC_OPTSTOIC:
results_url = f'{novostoic_frontend_url}/overall-stoichiometry/result/{updated_job.job_id}'
job_type_name = 'OptStoic'
Expand All @@ -142,6 +145,8 @@ def send_notification_email(self, updated_job, new_phase):
elif job_type == JobType.NOVOSTOIC_DGPREDICTOR:
results_url = f'{novostoic_frontend_url}/thermodynamical-feasibility/result/{updated_job.job_id}'
job_type_name = 'dGPredictor'
else:
raise ValueError(f"Unrecognized novoStoic subjob type {job_type} not in existing Job Types {JobType}")
elif job_type == JobType.SOMN:
somn_frontend_url = app_config['somn_frontend_url']
results_url = f'{somn_frontend_url}/results/{updated_job.job_id}'
Expand All @@ -157,26 +162,54 @@ def send_notification_email(self, updated_job, new_phase):
elif job_type == JobType.ACERETRO:
aceretro_frontend_url = app_config['aceretro_frontend_url']
results_url = f'{aceretro_frontend_url}/results/{updated_job.job_id}'
job_type_name = 'ACERETRO'
job_type_name = 'ACERetro'
elif job_type == JobType.REACTIONMINER:
reactionminer_frontend_url = app_config['reactionminer_frontend_url']
results_url = f'{reactionminer_frontend_url}/results/{updated_job.job_id}'
job_type_name = 'ReactionMiner'
else:
raise ValueError(f"Unrecognized job type {job_type} not in existing Job Types {JobType}")

job_id = updated_job.job_id

# Send email notification about success/failure
if new_phase == JobStatus.COMPLETED and updated_job.email:
if new_phase == JobStatus.COMPLETED and updated_job.email and self.should_send_email(job_type, job_id):
try:
self.email_service.send_email(updated_job.email,
f'''Result for your {job_type_name} Job ({updated_job.job_id}) is ready''',
f'''Result for your {job_type_name} Job ({job_id}) is ready''',
f'''The result for your {job_type_name} Job is available at {results_url}''')
self.mark_email_as_sent(job_type, job_id, success=True)
except Exception as e:
log.error(f'Failed to send email notification on success: {str(e)}')
elif new_phase == JobStatus.ERROR and updated_job.email:
elif new_phase == JobStatus.ERROR and updated_job.email and self.should_send_email(job_type, job_id):
try:
self.email_service.send_email(updated_job.email,
f'''{job_type_name} Job ({updated_job.job_id}) failed''',
f'''{job_type_name} Job ({job_id}) failed''',
f'''An error occurred in computing the result for your {job_type_name} job.''')
self.mark_email_as_sent(job_type, job_id, success=False)
except Exception as e:
log.error(f'Failed to send email notification on failure: {str(e)}')

def should_send_email(self, job_type, job_id):
# Check if email has already been sent
# if so, file should exist in MinIO
minio_bucket_name = job_type
minio_check_path = f'{job_id}/email-sent'
if self.minio_service.check_file_exists(minio_bucket_name, minio_check_path):
log.debug(f'Skipped sending email for {job_id}: email has already been sent for this job')
return False
return True

def mark_email_as_sent(self, job_type, job_id, success):
minio_bucket_name = job_type
minio_check_path = f'{job_id}/email-sent'
if success:
# Job completed successfully, email was sent indicating success
self.minio_service.upload_file(minio_bucket_name, minio_check_path, 'success')
else:
# Job failed with an error, email was sent indicating error
self.minio_service.upload_file(minio_bucket_name, minio_check_path, 'error')

def run(self):
# Ignore kube-system namespace
# TODO: Parameterize this?
Expand Down Expand Up @@ -528,6 +561,11 @@ def create_job(job_type, job_id, run_id=None, image_name=None, command=None, own
for volume in app_config['kubernetes_jobs'][job_type]['volumes']:
all_volumes.append(volume)

# Include secrets, if necessary (e.g. ReactionMiner for HuggingFace API token)
secrets = []
if 'secrets' in app_config['kubernetes_jobs'][job_type]:
secrets = app_config['kubernetes_jobs'][job_type]['secrets']

jobCompleteApiUrl = f'''{app_config['server']['protocol']}://{os.path.join(
app_config['server']['hostName'],
app_config['server']['basePath'],
Expand Down Expand Up @@ -572,6 +610,7 @@ def create_job(job_type, job_id, run_id=None, image_name=None, command=None, own
securityContext=app_config['kubernetes_jobs'][job_type]['securityContext'] if 'securityContext' in app_config['kubernetes_jobs'][job_type] else None,
workingVolume=app_config['kubernetes_jobs']['defaults']['workingVolume'],
volumes=all_volumes,
secrets=secrets,
resources=app_config['kubernetes_jobs'][job_type]['resources'] if 'resources' in app_config['kubernetes_jobs'][job_type] else app_config['kubernetes_jobs']['defaults']['resources'],
# apiToken=config['jwt']['hs256Secret'],
apiToken='dummy',
Expand Down
7 changes: 7 additions & 0 deletions app/services/minio_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def get_file(self, bucket_name, object_name):
log.error("Error: ", err)

def upload_file(self, bucket_name, object_name, file_content):
# If file_content is a string, first convert to a byte stream
if isinstance(file_content, str):
file_content = file_content.encode('utf-8')

try:
# Upload file to the specified bucket
result = self.client.put_object(
Expand All @@ -44,6 +48,9 @@ def upload_file(self, bucket_name, object_name, file_content):
except S3Error as err:
log.error("Error: ", err)
return False

def list_files(self, bucket_name, path, recursive=False):
return self.client.list_objects(bucket_name, prefix=path, recursive=recursive)

def get_file_urls(self, bucket_name, path):
try:
Expand Down
45 changes: 45 additions & 0 deletions app/services/reactionminer_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import os
import time

from fastapi import HTTPException
from sqlmodel.ext.asyncio.session import AsyncSession

from config import log
from models.enums import JobStatus
from services.minio_service import MinIOService


class ReactionMinerService:
def __init__(self, db) -> None:
self.db = db

async def update_job_phase(self, jobObject, phase: JobStatus):
jobObject.phase = phase
if phase == JobStatus.PROCESSING:
jobObject.time_start = int(time.time())
else:
jobObject.time_end = int(time.time())
self.db.add(jobObject)
await self.db.commit()

ckouder marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
async def resultPostProcess(bucket_name: str, job_id: str, service: MinIOService, db: AsyncSession):
"""
Inputs stored in Minio: /{job_id}/in/[name].pdf Bucket name: reactionminer
Outputs stored in Minio: /{job_id}/out/[name].json Bucket name: reactionminer
"""
folder_path = f"/{job_id}/out/"
objects = service.list_files(bucket_name, folder_path)

# Iterate over folder and add all contents to a dictionary
content = {}
for obj in objects:
file_name = os.path.basename(obj.object_name).split('/')[-1]
content[file_name] = json.loads(service.get_file(bucket_name=bucket_name, object_name=obj.object_name))

# Return the dictionary if it has contents
if not content:
raise HTTPException(status_code=404, detail=f"No output files were found")

return content
18 changes: 18 additions & 0 deletions app/services/templates/job.tpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ spec:
- name: "shared-storage"
persistentVolumeClaim:
claimName: "{{ workingVolume.claimName }}"
{%- for sec in secrets %}
- name: "{{ sec.name }}"
secret:
secretName: "{{ sec.secretName }}"
{%- if sec.itemList %}
items:
{%- for secretItem in sec.itemList %}
- key: "{{ secretItem.key }}"
path: "{{ secretItem.path }}"
{%- endfor %}
{%- endif %}
{%- endfor %}
initContainers:
- name: init
image: moleculemaker/mmli-backend:kubejob
Expand Down Expand Up @@ -157,6 +169,12 @@ spec:
subPath: "{{ volume.subPath | replace('JOB_ID',jobId) }}"
readOnly: {{ volume.readOnly }}
{%- endfor %}
{%- for sec in secrets %}
- name: "{{ sec.name }}"
mountPath: "{{ sec.mountPath }}"
readOnly: true
subPath: "{{ sec.subPath }}"
{%- endfor %}
containers:
- name: post-job
image: "moleculemaker/mmli-backend:kubejob"
Expand Down
6 changes: 6 additions & 0 deletions chart/pvc-explorer.pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ spec:
volumeMounts:
- mountPath: "/app/data"
name: data
- mountPath: "/nfs"
name: nfs
volumes:
- name: data
persistentVolumeClaim:
Expand All @@ -23,3 +25,7 @@ spec:

# staging
claimName: mmli-clean-job-weights
- name: nfs
nfs:
path: /taiga/ncsa/radiant/bbfp/mmli1/
server: taiga-nfs.ncsa.illinois.edu
4 changes: 4 additions & 0 deletions chart/values.local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ controller:
novostoic_frontend_url: "http://localhost:4200"
somn_frontend_url: "http://localhost:4200"
chemscraper_frontend_url: "http://localhost:4200"
clean_frontend_url: "http://localhost:4200"
molli_frontend_url: "http://localhost:4200"
aceretro_frontend_url: "http://localhost:4200"
reactionminer_frontend_url: "http://localhost:4200"

postgresql:
enabled: true
Expand Down
Loading
Loading