Skip to content

API integration for user-submitted differential expression calculations (SCP-5767) #2212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 5, 2025
153 changes: 151 additions & 2 deletions app/controllers/api/v1/site_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ module Api
module V1
class SiteController < ApiBaseController
before_action :set_current_api_user!
before_action :authenticate_api_user!, only: [:download_data, :stream_data, :get_study_analysis_config,
:submit_study_analysis, :get_study_submissions,
before_action :authenticate_api_user!, only: [:download_data, :stream_data, :submit_differential_expression,
:get_study_analysis_config, :submit_study_analysis, :get_study_submissions,
:get_study_submission, :sync_submission_outputs]
before_action :set_study, except: [:studies, :check_terra_tos_acceptance, :analyses, :get_analysis, :renew_user_access_token]
before_action :set_analysis_configuration, only: [:get_analysis, :get_study_analysis_config]
Expand Down Expand Up @@ -403,6 +403,151 @@ def stream_data
end
end

swagger_path '/site/studies/{accession}/differential_expression' do
operation :post do
key :tags, [
'Site'
]
key :summary, 'Submit a differential expression calculation request'
key :description, 'Request differential expression calculations for a given cluster/annotation in a study'
key :operationId, 'site_study_submit_differential_expression_path'
parameter do
key :name, :accession
key :in, :path
key :description, 'Accession of Study to use'
key :required, true
key :type, :string
end
parameter do
key :name, :de_job
key :type, :object
key :in, :body
schema do
property :cluster_name do
key :description, 'Name of cluster group. Use "_default" to use the default cluster'
key :required, true
key :type, :string
end
property :annotation_name do
key :description, 'Name of annotation'
key :required, true
key :type, :string
end
property :annotation_scope do
key :description, 'Scope of annotation. One of "study" or "cluster".'
key :type, :string
key :required, true
key :enum, Api::V1::Visualization::AnnotationsController::VALID_SCOPE_VALUES
end
property :de_type do
key :description, 'Type of differential expression analysis. Either "rest" (one-vs-rest) or "pairwise"'
key :type, :string
key :required, true
key :enum, %w[rest pairwise]
end
property :group1 do
key :description, 'First group for pairwise analysis (optional)'
key :type, :string
end
property :group2 do
key :description, 'Second group for pairwise analysis (optional)'
key :type, :string
end
end
end
response 204 do
key :description, 'Job successfully submitted'
end
response 401 do
key :description, ApiBaseController.unauthorized
end
response 403 do
key :description, ApiBaseController.forbidden('view study, study has author DE')
end
response 404 do
key :description, ApiBaseController.not_found(Study, 'Cluster', 'Annotation')
end
response 406 do
key :description, ApiBaseController.not_acceptable
end
response 409 do
key :description, "Results are processing or already exist"
end
response 410 do
key :description, ApiBaseController.resource_gone
end
response 422 do
key :description, "Job parameters failed validation"
end
response 429 do
key :description, 'Weekly user quota exceeded'
end
end
end

def submit_differential_expression
# disallow DE calculation requests for studies with author DE
if @study.differential_expression_results.where(is_author_de: true).any?
render json: {
error: 'User requests are disabled for this study as it contains author-supplied differential expression results'
}, status: 403 and return
end

# check user quota before proceeding
if DifferentialExpressionService.job_exceeds_quota?(current_api_user)
# minimal log props to help gauge overall user interest, as well as annotation/de types
log_props = {
studyAccession: @study.accession, annotationName: params[:annotation_name], de_type: params[:de_type]
}
MetricsService.log('quota-exceeded-de', log_props, current_api_user, request:)
current_quota = DifferentialExpressionService.get_weekly_user_quota
render json: { error: "You have exceeded your weekly quota of #{current_quota} requests" },
status: 429 and return
end

cluster_name = params[:cluster_name]
cluster = cluster_name == '_default' ? @study.default_cluster : @study.cluster_groups.by_name(cluster_name)
render json: { error: "Requested cluster #{cluster_name} not found" }, status: 404 and return if cluster.nil?

annotation_name = params[:annotation_name]
annotation_scope = params[:annotation_scope]
de_type = params[:de_type]
pairwise = de_type == 'pairwise'
group1 = params[:group1]
group2 = params[:group2]
annotation = AnnotationVizService.get_selected_annotation(
@study, cluster:, annot_name: annotation_name, annot_type: 'group', annot_scope: annotation_scope
)
render json: { error: 'No matching annotation found' }, status: 404 and return if annotation.nil?

de_params = { annotation_name:, annotation_scope:, de_type:, group1:, group2: }

# check if these results already exist
# for pairwise, also check if requested comparisons exist
result = DifferentialExpressionResult.find_by(
study: @study, cluster_group: cluster, annotation_name:, annotation_scope:, is_author_de: false
)
if result && (!pairwise || (pairwise && result.has_pairwise_comparison?(group1, group2)))
render json: { error: "Requested results already exist" }, status: 409 and return
end

begin
submitted = DifferentialExpressionService.run_differential_expression_job(
cluster, @study, current_api_user, **de_params
)
if submitted
DifferentialExpressionService.increment_user_quota(current_api_user)
head 204
else
# submitted: false here means that there is a matching running DE job
render json: { error: "Requested results are processing - please check back later" }, status: 409
end
rescue ArgumentError => e
# job parameters failed to validate
render json: { error: e.message}, status: 422 and return
end
end

swagger_path '/site/analyses' do
operation :get do
key :tags, [
Expand Down Expand Up @@ -1136,6 +1281,10 @@ def get_download_quota
end
end

def de_job_params
params.require(:de_job).permit(:cluster_name, :annotation_name, :annotation_scope, :de_type, :group1, :group2)
end

# update AnalysisSubmissions when loading study analysis tab
# will not backfill existing workflows to keep our submission history clean
def update_analysis_submission(submission)
Expand Down
81 changes: 61 additions & 20 deletions app/lib/differential_expression_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class DifferentialExpressionService
ALLOWED_ANNOTS = Regexp.union(CELL_TYPE_MATCHER, CLUSTERING_MATCHER)
# specific annotations to exclude from automation
EXCLUDED_ANNOTS = /(enrichment__cell_type)/i
# default quota for user-requested DE jobs
DEFAULT_USER_QUOTA = 5

# run a differential expression job for a given study on the default cluster/annotation
#
Expand Down Expand Up @@ -163,25 +165,27 @@ def self.run_differential_expression_job(cluster_group, study, user, annotation_
params_object.machine_type = machine_type if machine_type.present? # override :machine_type if specified
return true if dry_run # exit before submission if specified as annotation was already validated

# check if there's already a job running using these parameters and exit if so
job_params = ApplicationController.batch_api_client.format_command_line(
study_file:, action: :differential_expression, user_metrics_uuid: user.metrics_uuid, params_object:
)
running = ApplicationController.batch_api_client.find_matching_jobs(
params: job_params, job_states: BatchApiClient::RUNNING_STATES
)
if running.any?
log_message "Found #{running.count} running DE jobs: #{running.map(&:name).join(', ')}"
log_message "Matching these parameters: #{job_params.join(' ')}"
log_message "Exiting without queuing new job"
false
elsif params_object.valid?
# launch DE job
job = IngestJob.new(study:, study_file:, user:, action: :differential_expression, params_object:)
job.delay.push_remote_and_launch_ingest
true
if params_object.valid?
# check if there's already a job running using these parameters and exit if so
job_params = ApplicationController.batch_api_client.format_command_line(
study_file:, action: :differential_expression, user_metrics_uuid: user.metrics_uuid, params_object:
)
running = ApplicationController.batch_api_client.find_matching_jobs(
params: job_params, job_states: BatchApiClient::RUNNING_STATES
)
if running.any?
log_message "Found #{running.count} running DE jobs: #{running.map(&:name).join(', ')}"
log_message "Matching these parameters: #{job_params.join(' ')}"
log_message "Exiting without queuing new job"
false
else
# launch DE job
job = IngestJob.new(study:, study_file:, user:, action: :differential_expression, params_object:)
job.delay.push_remote_and_launch_ingest
true
end
else
raise ArgumentError, "job parameters failed to validate: #{params_object.errors.full_messages}"
raise ArgumentError, "job parameters failed to validate: #{params_object.errors.full_messages.join(', ')}"
end
end

Expand Down Expand Up @@ -377,12 +381,14 @@ def self.validate_annotation(cluster_group, study, annotation_name, annotation_s
if !pairwise && cells_by_label.keys.count < 2
raise ArgumentError, "#{identifier} does not have enough labels represented in #{cluster_group.name}"
elsif pairwise
missing = {
missing = [group1, group2] - annotation[:values]
raise ArgumentError, "#{annotation_name} does not contain '#{missing.join(', ')}'" if missing.any?
cell_count = {
"#{group1}" => cells_by_label[group1].count,
"#{group2}" => cells_by_label[group2].count
}.keep_if { |_, c| c < 2 }
raise ArgumentError,
"#{missing.keys.join(', ')} does not have enough cells represented in #{identifier}" if missing.any?
"#{cell_count.keys.join(', ')} does not have enough cells represented in #{identifier}" if cell_count.any?
end
end

Expand Down Expand Up @@ -451,4 +457,39 @@ def self.cluster_file_url(cluster_group)
study_file.gs_url
end
end

# retrieve the weekly user quota value
#
# * *returns*
# - (Integer)
def self.get_weekly_user_quota
config = AdminConfiguration.find_by(config_type: 'Weekly User DE Quota')
config ? config.value.to_i : DEFAULT_USER_QUOTA
end

# check if a user has exceeded their weekly quota
#
# * *params*
# - +user+ (User) => user requesting DE job
#
# * *returns*
# - (Boolean)
def self.job_exceeds_quota?(user)
user.weekly_de_quota.to_i >= get_weekly_user_quota
end

# increment a given user's weekly quota
#
# * *params*
# - +user+ (User) => user requesting DE job
def self.increment_user_quota(user)
current_quota = user.weekly_de_quota.to_i
current_quota += 1
user.update(weekly_de_quota: current_quota)
end

# reset all user-requested DE quotas on a weekly basis
def self.reset_all_user_quotas
User.update_all(weekly_de_quota: 0)
end
end
5 changes: 3 additions & 2 deletions app/models/admin_configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ class AdminConfiguration
API_NOTIFIER_NAME = 'API Health Check Notifier'.freeze
INGEST_DOCKER_NAME = 'Ingest Pipeline Docker Image'.freeze
NUMERIC_VALS = %w[byte kilobyte megabyte terabyte petabyte exabyte].freeze
CONFIG_TYPES = [INGEST_DOCKER_NAME, 'Daily User Download Quota', 'Portal FireCloud User Group', 'TDR Snapshot IDs',
'Reference Data Workspace', 'Read-Only Access Control', 'QA Dev Email', API_NOTIFIER_NAME].freeze
CONFIG_TYPES = [INGEST_DOCKER_NAME, 'Daily User Download Quota', 'Weekly User DE Quota', 'Portal FireCloud User Group',
'TDR Snapshot IDs', 'Reference Data Workspace', 'Read-Only Access Control', 'QA Dev Email',
API_NOTIFIER_NAME].freeze
ALL_CONFIG_TYPES = (CONFIG_TYPES.dup << FIRECLOUD_ACCESS_NAME).freeze
VALUE_TYPES = %w[Numeric Boolean String].freeze

Expand Down
6 changes: 5 additions & 1 deletion app/models/ingest_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,13 @@ def get_job_analytics
job_props.merge!(
{
numCells: cluster&.points,
numAnnotationValues: annotation[:values]&.size
numAnnotationValues: annotation[:values]&.size,
deType: params_object.de_type
}
)
if params_object.de_type == 'pairwise'
job_props.merge!( { pairwiseGroups: [params_object.group1, params_object.group2]})
end
when :image_pipeline
data_cache_perftime = params_object.data_cache_perftime
job_props.merge!(
Expand Down
1 change: 1 addition & 0 deletions app/models/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def owned_by(user)
field :admin, type: Boolean
field :reporter, type: Boolean
field :daily_download_quota, type: Integer, default: 0
field :weekly_de_quota, type: Integer, default: 0 # limits number of user-requested DE jobs
field :admin_email_delivery, type: Boolean, default: true
field :registered_for_firecloud, type: Boolean, default: false
# {
Expand Down
1 change: 1 addition & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
get 'studies/:accession', to: 'site#view_study', as: :site_study_view
get 'studies/:accession/download', to: 'site#download_data', as: :site_study_download_data
get 'studies/:accession/stream', to: 'site#stream_data', as: :site_study_stream_data
post 'studies/:accession/differential_expression', to: 'site#submit_differential_expression', as: :site_study_submit_differential_expression
get 'studies/:accession/renew_read_only_access_token', to: 'site#renew_read_only_access_token', as: :site_renew_read_only_access_token
get 'renew_user_access_token', to: 'site#renew_user_access_token', as: :site_renew_user_access_token

Expand Down
1 change: 1 addition & 0 deletions rails_startup.bash
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ fi

echo "*** ADDING DAILY RESET OF USER DOWNLOAD QUOTAS ***"
(crontab -u app -l ; echo "@daily . /home/app/.cron_env ; cd /home/app/webapp/; /home/app/webapp/bin/rails runner -e $PASSENGER_APP_ENV \"DownloadQuotaService.reset_all_quotas\" >> /home/app/webapp/log/cron_out.log 2>&1") | crontab -u app -
(crontab -u app -l ; echo "@weekly . /home/app/.cron_env ; cd /home/app/webapp/; /home/app/webapp/bin/rails runner -e $PASSENGER_APP_ENV \"DifferentialExpressionService.reset_all_user_quotas\" >> /home/app/webapp/log/cron_out.log 2>&1") | crontab -u app -
echo "*** COMPLETED ***"

echo "*** LOCALIZING USER ASSETS ***"
Expand Down
Loading