diff --git a/adsmp/app.py b/adsmp/app.py index b507106..79b95c4 100644 --- a/adsmp/app.py +++ b/adsmp/app.py @@ -5,6 +5,7 @@ from . import exceptions from adsmp.models import ChangeLog, IdentifierMapping, MetricsBase, MetricsModel, Records, SitemapInfo from adsmsg import OrcidClaims, DenormalizedRecord, FulltextUpdate, MetricsRecord, NonBibRecord, NonBibRecordList, MetricsRecordList, AugmentAffiliationResponseRecord, AugmentAffiliationRequestRecord, ClassifyRequestRecord, ClassifyRequestRecordList, ClassifyResponseRecord, ClassifyResponseRecordList, BoostRequestRecord, BoostRequestRecordList, BoostResponseRecord, BoostResponseRecordList,Status as AdsMsgStatus +from google.protobuf.json_format import ParseDict from adsmsg.msg import Msg from adsputils import ADSCelery, create_engine, sessionmaker, scoped_session, contextmanager from sqlalchemy.orm import load_only as _load_only @@ -24,6 +25,7 @@ import csv from datetime import timedelta from SciXPipelineUtils import scix_id +import base64 class ADSMasterPipelineCelery(ADSCelery): @@ -739,27 +741,31 @@ def request_classify(self, bibcode=None,scix_id = None, filename=None,mode='auto batch_idx += batch_size batch_list = [] - def _populate_boost_request_from_record(self, rec, metrics, classifications, + def _populate_boost_request_from_record(self, bib_data, metrics, classifications, scix_id, run_id=None, output_path=None, request_type=None): """ Returns a dictionary with bib_data, metrics, and classifications to Boost Pipeline. """ - bib_data = rec.get('bib_data', '') - + self.logger.info('Populating boost request from bib_data: {}'.format(bib_data)) + self.logger.info('Bib_data type: {}'.format(type(bib_data))) # Create the new nested message structure that Boost Pipeline expects + + bib_data = json.loads(bib_data) if isinstance(bib_data, str) else bib_data + metrics = json.loads(metrics) if isinstance(metrics, str) else metrics + message = { # Root level fields - 'bibcode': rec.get('bibcode', ''), - 'scix_id': rec.get('scix_id', ''), + 'bibcode': bib_data.get('bibcode', ''), + 'scix_id': scix_id, 'status': 'updated', # bib_data section - primary source for paper metadata - 'bib_data': bib_data.decode('utf-8') if isinstance(bib_data, bytes) else bib_data, - # metrics section - primary source for refereed status and citations - 'metrics': metrics.decode('utf-8') if isinstance(metrics, bytes) else metrics, - + 'bib_data': base64.b64encode(json.dumps(bib_data).encode('utf-8')).decode('utf-8') if isinstance(bib_data, dict) else base64.b64encode(str(bib_data).encode('utf-8')).decode('utf-8'), + + # metrics section - primary source for refereed status and citations + 'metrics': base64.b64encode(json.dumps(metrics).encode('utf-8')).decode('utf-8') if isinstance(metrics, dict) else base64.b64encode(str(metrics).encode('utf-8')).decode('utf-8'), # classifications section - primary source for collections - 'classifications': list(classifications), + 'classifications': classifications, 'collections': list(''), 'run_id': 0, @@ -769,21 +775,19 @@ def _populate_boost_request_from_record(self, rec, metrics, classifications, return message def _get_info_for_boost_entry(self, bibcode): + self.logger.info('Getting info for boost entry for bibcode: {}'.format(bibcode)) rec = self.get_record(bibcode) or {} - metrics = {} - try: - metrics = self.get_metrics(bibcode) or {} - except Exception: - pass - - collections = [] - - # Extract collections from classifications (primary source) - classifications = rec.get('classifications', list('')) - entry = None + if rec: - entry = (rec, metrics, collections) + bib_data = rec.get('bib_data', '') + metrics = rec.get('metrics', '') + scix_id = rec.get('scix_id', '') + + # Extract collections from classifications (primary source) + classifications = rec.get('classifications', list('')) + + entry = (bib_data, metrics, classifications, scix_id) return entry def generate_boost_request_message(self, bibcode, run_id=None, output_path=None): @@ -811,29 +815,31 @@ def generate_boost_request_message(self, bibcode, run_id=None, output_path=None) try: # Get record data for this bibcode - (rec, metrics, classifications) = self._get_info_for_boost_entry(bibcode) - if not rec: + (bib_data, metrics, classifications, scix_id) = self._get_info_for_boost_entry(bibcode) + if not bib_data: self.logger.debug('Skipping bibcode with no data: %s', bibcode) return False # Create message for this record - message = self._populate_boost_request_from_record(rec, metrics, classifications, + message = self._populate_boost_request_from_record(bib_data, metrics, classifications, scix_id, run_id, output_path, None) + protobuf_format = BoostRequestRecord() + + output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST') + output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST') + self.logger.debug('output_taskname: {}'.format(output_taskname)) + self.logger.debug('output_broker: {}'.format(output_broker)) + self.logger.debug('sending message {}'.format(message)) + + response_message = ParseDict(message, protobuf_format) except Exception as e: self.logger.error('Error retrieving record data for bibcode %s: %s', bibcode, e) - self.logger.error('Message content: %s', message) raise - output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST') - output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST') - self.logger.debug('output_taskname: {}'.format(output_taskname)) - self.logger.debug('output_broker: {}'.format(output_broker)) - self.logger.debug('sending message {}'.format(message)) - # Forward message to Boost Pipeline - Celery workers will handle the rest try: - self.forward_message(message, pipeline='boost') + self.forward_message(response_message, pipeline='boost') self.logger.info('Sent boost request for bibcode %s to Boost Pipeline', bibcode) return True @@ -841,6 +847,57 @@ def generate_boost_request_message(self, bibcode, run_id=None, output_path=None) self.logger.exception('Error sending boost request for bibcode %s: %s', bibcode, e) return False + def generate_boost_request_message_batch(self, bibcode, bib_data, metrics, classifications, scix_id, run_id=None, output_path=None): + """Build and send boost request message to Boost Pipeline. + + Parameters + ---------- + bib_data : dictionary + The bib_data section of the record + metrics : dictionary + The metrics section of the record + classifications : list + The classifications section of the record + scix_id : str + The scix_id section of the record + run_id : int, optional + Optional job/run identifier added to each entry. + output_path : str, optional + Optional output path hint added to each entry. + + Returns + ------- + bool + True if message was sent successfully, False otherwise. + """ + try: + # Create message for this record + message = self._populate_boost_request_from_record(bib_data, metrics, classifications, scix_id, + run_id, output_path, None) + protobuf_format = BoostRequestRecord() + + output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST') + output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST') + self.logger.debug('output_taskname: {}'.format(output_taskname)) + self.logger.debug('output_broker: {}'.format(output_broker)) + self.logger.debug('sending message {}'.format(message)) + + response_message = ParseDict(message, protobuf_format) + + except Exception as e: + self.logger.error('Error creating boost request message: %s', e) + raise + + # Forward message to Boost Pipeline - Celery workers will handle the rest + try: + self.forward_message(response_message, pipeline='boost') + self.logger.info('Sent boost request to Boost Pipeline') + return True + + except Exception as e: + self.logger.exception('Error sending boost request: %s', e) + return False + def generate_links_for_resolver(self, record): """use nonbib or bib elements of database record and return links for resolver and checksum""" # nonbib data has something like diff --git a/adsmp/tasks.py b/adsmp/tasks.py index 7620cc9..e92e9e1 100644 --- a/adsmp/tasks.py +++ b/adsmp/tasks.py @@ -18,6 +18,7 @@ from collections import defaultdict import pdb from sqlalchemy.orm import load_only +import json from celery.exceptions import Retry @@ -42,6 +43,7 @@ Queue('update-sitemap-files', app.exchange, routing_key='update-sitemap-files'), Queue('update-scixid', app.exchange, routing_key='update-scixid'), Queue('boost-request', app.exchange, routing_key='boost-request'), + Queue('boost-request-batch', app.exchange, routing_key='boost-request-batch'), ) @@ -1113,12 +1115,30 @@ def task_boost_request(bibcodes): bibcodes = [bibcodes] for bibcode in bibcodes: + logger.info('Processing boost request for bibcode: {}'.format(bibcode)) result = app.generate_boost_request_message(bibcode) logger.info('Boost requests for %s bibcode(s) sent to boost pipeline', len(bibcodes)) return result +@app.task(queue='boost-request-batch') +def task_boost_request_batch(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list): + """Process boost requests for a batch of records + + @param bib_data: dictionary - the bib_data section of the record + @param metrics: dictionary - the metrics section of the record + @param classifications: list - the classifications section of the record + @param scix_ids: list - the scix_ids section of the record + """ + + logger.info('Processing boost request for batch of records') + for (bibcode, bib_data, met, cl, scix_id) in zip(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list): + logger.info('Processing boost request for record: {}'.format(bibcode)) + app.generate_boost_request_message_batch(bibcode, bib_data, met, cl, scix_id) + + logger.info('Boost requests for %s records sent to boost pipeline', len(bibcode_list)) + return True if __name__ == '__main__': app.start() diff --git a/run.py b/run.py index 15b4978..6cb3f5a 100755 --- a/run.py +++ b/run.py @@ -340,27 +340,31 @@ def process_all_boost(batch_size): with app.session_scope() as session: # load all records from RecordsDB for rec in session.query(Records) \ - .options(load_only(Records.bibcode)) \ + .options(load_only(Records.bibcode,Records.bib_data, Records.metrics, Records.classifications, Records.scix_id)) \ .yield_per(batch_size): sent += 1 if sent % 1000 == 0: logger.debug('Sending %s records', sent) - batch.append(rec.bibcode) + batch.append((rec.bibcode, rec.bib_data, rec.metrics, rec.classifications, rec.scix_id)) if len(batch) >= batch_size: logger.info('Sending batch of %s records to Boost Pipeline', len(batch)) - t = tasks.task_boost_request.delay(batch) + # Unpack the batch into separate lists for the task + bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list = zip(*batch) + t = tasks.task_boost_request_batch.delay(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list) _tasks.append(t) batch = [] if len(batch) > 0: logger.info('Sending final batch of %s records to Boost Pipeline', len(batch)) - t = tasks.task_boost_request.delay(batch) + # Unpack the final batch into separate lists for the task + bib_data_list, metrics_list, classifications_list, scix_ids_list = zip(*batch) + t = tasks.task_boost_request_batch.delay(bibcode_list, bib_data_list, metrics_list, classifications_list, scix_ids_list) _tasks.append(t) logger.debug('Sending %s records', len(batch)) - logger.info('Completed process_all_boost, sent total of %s records', sent) + logger.info('Sent total of %s records to boost pipeline', sent) def rebuild_collection(collection_name, batch_size):