Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 90 additions & 33 deletions adsmp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from . import exceptions
from adsmp.models import ChangeLog, IdentifierMapping, MetricsBase, MetricsModel, Records, SitemapInfo
from adsmsg import OrcidClaims, DenormalizedRecord, FulltextUpdate, MetricsRecord, NonBibRecord, NonBibRecordList, MetricsRecordList, AugmentAffiliationResponseRecord, AugmentAffiliationRequestRecord, ClassifyRequestRecord, ClassifyRequestRecordList, ClassifyResponseRecord, ClassifyResponseRecordList, BoostRequestRecord, BoostRequestRecordList, BoostResponseRecord, BoostResponseRecordList,Status as AdsMsgStatus
from google.protobuf.json_format import ParseDict
from adsmsg.msg import Msg
from adsputils import ADSCelery, create_engine, sessionmaker, scoped_session, contextmanager
from sqlalchemy.orm import load_only as _load_only
Expand All @@ -24,6 +25,7 @@
import csv
from datetime import timedelta
from SciXPipelineUtils import scix_id
import base64

class ADSMasterPipelineCelery(ADSCelery):

Expand Down Expand Up @@ -739,27 +741,31 @@ def request_classify(self, bibcode=None,scix_id = None, filename=None,mode='auto
batch_idx += batch_size
batch_list = []

def _populate_boost_request_from_record(self, rec, metrics, classifications,
def _populate_boost_request_from_record(self, bib_data, metrics, classifications, scix_id,
run_id=None, output_path=None, request_type=None):
"""
Returns a dictionary with bib_data, metrics, and classifications to Boost Pipeline.
"""
bib_data = rec.get('bib_data', '')

self.logger.info('Populating boost request from bib_data: {}'.format(bib_data))
self.logger.info('Bib_data type: {}'.format(type(bib_data)))
# Create the new nested message structure that Boost Pipeline expects

bib_data = json.loads(bib_data) if isinstance(bib_data, str) else bib_data
metrics = json.loads(metrics) if isinstance(metrics, str) else metrics

message = {
# Root level fields
'bibcode': rec.get('bibcode', ''),
'scix_id': rec.get('scix_id', ''),
'bibcode': bib_data.get('bibcode', ''),
'scix_id': scix_id,
'status': 'updated',

# bib_data section - primary source for paper metadata
'bib_data': bib_data.decode('utf-8') if isinstance(bib_data, bytes) else bib_data,
# metrics section - primary source for refereed status and citations
'metrics': metrics.decode('utf-8') if isinstance(metrics, bytes) else metrics,

'bib_data': base64.b64encode(json.dumps(bib_data).encode('utf-8')).decode('utf-8') if isinstance(bib_data, dict) else base64.b64encode(str(bib_data).encode('utf-8')).decode('utf-8'),

# metrics section - primary source for refereed status and citations
'metrics': base64.b64encode(json.dumps(metrics).encode('utf-8')).decode('utf-8') if isinstance(metrics, dict) else base64.b64encode(str(metrics).encode('utf-8')).decode('utf-8'),
# classifications section - primary source for collections
'classifications': list(classifications),
'classifications': classifications,

'collections': list(''),
'run_id': 0,
Expand All @@ -769,21 +775,19 @@ def _populate_boost_request_from_record(self, rec, metrics, classifications,
return message

def _get_info_for_boost_entry(self, bibcode):
self.logger.info('Getting info for boost entry for bibcode: {}'.format(bibcode))
rec = self.get_record(bibcode) or {}
metrics = {}
try:
metrics = self.get_metrics(bibcode) or {}
except Exception:
pass

collections = []

# Extract collections from classifications (primary source)
classifications = rec.get('classifications', list(''))

entry = None

if rec:
entry = (rec, metrics, collections)
bib_data = rec.get('bib_data', '')
metrics = rec.get('metrics', '')
scix_id = rec.get('scix_id', '')

# Extract collections from classifications (primary source)
classifications = rec.get('classifications', list(''))

entry = (bib_data, metrics, classifications, scix_id)
return entry

def generate_boost_request_message(self, bibcode, run_id=None, output_path=None):
Expand Down Expand Up @@ -811,36 +815,89 @@ def generate_boost_request_message(self, bibcode, run_id=None, output_path=None)

try:
# Get record data for this bibcode
(rec, metrics, classifications) = self._get_info_for_boost_entry(bibcode)
if not rec:
(bib_data, metrics, classifications, scix_id) = self._get_info_for_boost_entry(bibcode)
if not bib_data:
self.logger.debug('Skipping bibcode with no data: %s', bibcode)
return False

# Create message for this record
message = self._populate_boost_request_from_record(rec, metrics, classifications,
message = self._populate_boost_request_from_record(bib_data, metrics, classifications, scix_id,
run_id, output_path, None)
protobuf_format = BoostRequestRecord()

output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST')
output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST')
self.logger.debug('output_taskname: {}'.format(output_taskname))
self.logger.debug('output_broker: {}'.format(output_broker))
self.logger.debug('sending message {}'.format(message))

response_message = ParseDict(message, protobuf_format)

except Exception as e:
self.logger.error('Error retrieving record data for bibcode %s: %s', bibcode, e)
self.logger.error('Message content: %s', message)
raise

output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST')
output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST')
self.logger.debug('output_taskname: {}'.format(output_taskname))
self.logger.debug('output_broker: {}'.format(output_broker))
self.logger.debug('sending message {}'.format(message))

# Forward message to Boost Pipeline - Celery workers will handle the rest
try:
self.forward_message(message, pipeline='boost')
self.forward_message(response_message, pipeline='boost')
self.logger.info('Sent boost request for bibcode %s to Boost Pipeline', bibcode)
return True

except Exception as e:
self.logger.exception('Error sending boost request for bibcode %s: %s', bibcode, e)
return False

def generate_boost_request_message_batch(self, bibcode, bib_data, metrics, classifications, scix_id, run_id=None, output_path=None):
"""Build and send boost request message to Boost Pipeline.

Parameters
----------
bib_data : dictionary
The bib_data section of the record
metrics : dictionary
The metrics section of the record
classifications : list
The classifications section of the record
scix_id : str
The scix_id section of the record
run_id : int, optional
Optional job/run identifier added to each entry.
output_path : str, optional
Optional output path hint added to each entry.

Returns
-------
bool
True if message was sent successfully, False otherwise.
"""
try:
# Create message for this record
message = self._populate_boost_request_from_record(bib_data, metrics, classifications, scix_id,
run_id, output_path, None)
protobuf_format = BoostRequestRecord()

output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST')
output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST')
self.logger.debug('output_taskname: {}'.format(output_taskname))
self.logger.debug('output_broker: {}'.format(output_broker))
self.logger.debug('sending message {}'.format(message))

response_message = ParseDict(message, protobuf_format)

except Exception as e:
self.logger.error('Error creating boost request message: %s', e)
raise

# Forward message to Boost Pipeline - Celery workers will handle the rest
try:
self.forward_message(response_message, pipeline='boost')
self.logger.info('Sent boost request to Boost Pipeline')
return True

except Exception as e:
self.logger.exception('Error sending boost request: %s', e)
return False

def generate_links_for_resolver(self, record):
"""use nonbib or bib elements of database record and return links for resolver and checksum"""
# nonbib data has something like
Expand Down
20 changes: 20 additions & 0 deletions adsmp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from collections import defaultdict
import pdb
from sqlalchemy.orm import load_only
import json
from celery.exceptions import Retry


Expand All @@ -42,6 +43,7 @@
Queue('update-sitemap-files', app.exchange, routing_key='update-sitemap-files'),
Queue('update-scixid', app.exchange, routing_key='update-scixid'),
Queue('boost-request', app.exchange, routing_key='boost-request'),
Queue('boost-request-batch', app.exchange, routing_key='boost-request-batch'),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add new celery worker

)


Expand Down Expand Up @@ -1113,12 +1115,30 @@ def task_boost_request(bibcodes):
bibcodes = [bibcodes]

for bibcode in bibcodes:
logger.info('Processing boost request for bibcode: {}'.format(bibcode))
result = app.generate_boost_request_message(bibcode)

logger.info('Boost requests for %s bibcode(s) sent to boost pipeline', len(bibcodes))

return result

@app.task(queue='boost-request-batch')
def task_boost_request_batch(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list):
"""Process boost requests for a batch of records

@param bib_data: dictionary - the bib_data section of the record
@param metrics: dictionary - the metrics section of the record
@param classifications: list - the classifications section of the record
@param scix_ids: list - the scix_ids section of the record
"""

logger.info('Processing boost request for batch of records')
for (bibcode, bib_data, met, cl, scix_id) in zip(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list):
logger.info('Processing boost request for record: {}'.format(bibcode))
app.generate_boost_request_message_batch(bibcode, bib_data, met, cl, scix_id)

logger.info('Boost requests for %s records sent to boost pipeline', len(bibcode_list))
return True

if __name__ == '__main__':
app.start()
14 changes: 9 additions & 5 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,27 +340,31 @@ def process_all_boost(batch_size):
with app.session_scope() as session:
# load all records from RecordsDB
for rec in session.query(Records) \
.options(load_only(Records.bibcode)) \
.options(load_only(Records.bibcode,Records.bib_data, Records.metrics, Records.classifications, Records.scix_id)) \
.yield_per(batch_size):

sent += 1
if sent % 1000 == 0:
logger.debug('Sending %s records', sent)

batch.append(rec.bibcode)
batch.append((rec.bibcode, rec.bib_data, rec.metrics, rec.classifications, rec.scix_id))
if len(batch) >= batch_size:
logger.info('Sending batch of %s records to Boost Pipeline', len(batch))
t = tasks.task_boost_request.delay(batch)
# Unpack the batch into separate lists for the task
bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list = zip(*batch)
t = tasks.task_boost_request_batch.delay(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list)
_tasks.append(t)
batch = []

if len(batch) > 0:
logger.info('Sending final batch of %s records to Boost Pipeline', len(batch))
t = tasks.task_boost_request.delay(batch)
# Unpack the final batch into separate lists for the task
bib_data_list, metrics_list, classifications_list, scix_ids_list = zip(*batch)
t = tasks.task_boost_request_batch.delay(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list)
_tasks.append(t)
logger.debug('Sending %s records', len(batch))

logger.info('Completed process_all_boost, sent total of %s records', sent)
logger.info('Sent total of %s records to boost pipeline', sent)


def rebuild_collection(collection_name, batch_size):
Expand Down