Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 71 additions & 22 deletions adsmp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from . import exceptions
from adsmp.models import ChangeLog, IdentifierMapping, MetricsBase, MetricsModel, Records, SitemapInfo
from adsmsg import OrcidClaims, DenormalizedRecord, FulltextUpdate, MetricsRecord, NonBibRecord, NonBibRecordList, MetricsRecordList, AugmentAffiliationResponseRecord, AugmentAffiliationRequestRecord, ClassifyRequestRecord, ClassifyRequestRecordList, ClassifyResponseRecord, ClassifyResponseRecordList, BoostRequestRecord, BoostRequestRecordList, BoostResponseRecord, BoostResponseRecordList,Status as AdsMsgStatus
from google.protobuf.json_format import ParseDict
from adsmsg.msg import Msg
from adsputils import ADSCelery, create_engine, sessionmaker, scoped_session, contextmanager
from sqlalchemy.orm import load_only as _load_only
Expand All @@ -24,6 +25,7 @@
from sqlalchemy.dialects.postgresql import insert
import csv
from SciXPipelineUtils import scix_id
import base64

class ADSMasterPipelineCelery(ADSCelery):

Expand Down Expand Up @@ -676,7 +678,7 @@ def _populate_boost_request_from_record(self, rec, metrics, classifications,
"""
Returns a dictionary with bib_data, metrics, and classifications to Boost Pipeline.
"""
bib_data = rec.get('bib_data', '')
bib_data = rec

# Create the new nested message structure that Boost Pipeline expects
message = {
Expand All @@ -686,12 +688,12 @@ def _populate_boost_request_from_record(self, rec, metrics, classifications,
'status': 'updated',

# bib_data section - primary source for paper metadata
'bib_data': bib_data.decode('utf-8') if isinstance(bib_data, bytes) else bib_data,
# metrics section - primary source for refereed status and citations
'metrics': metrics.decode('utf-8') if isinstance(metrics, bytes) else metrics,

'bib_data': base64.b64encode(json.dumps(bib_data).encode('utf-8')).decode('utf-8') if isinstance(bib_data, dict) else base64.b64encode(str(bib_data).encode('utf-8')).decode('utf-8'),

# metrics section - primary source for refereed status and citations
'metrics': base64.b64encode(json.dumps(metrics).encode('utf-8')).decode('utf-8') if isinstance(metrics, dict) else base64.b64encode(str(metrics).encode('utf-8')).decode('utf-8'),
# classifications section - primary source for collections
'classifications': list(classifications),
'classifications': classifications,

'collections': list(''),
'run_id': 0,
Expand All @@ -701,21 +703,17 @@ def _populate_boost_request_from_record(self, rec, metrics, classifications,
return message

def _get_info_for_boost_entry(self, bibcode):
self.logger.info('Getting info for boost entry for bibcode: {}'.format(bibcode))
rec = self.get_record(bibcode) or {}
metrics = {}
try:
metrics = self.get_metrics(bibcode) or {}
except Exception:
pass

collections = []
self.logger.info('Record: {}'.format(rec))
metrics = rec.get('metrics', {})

# Extract collections from classifications (primary source)
classifications = rec.get('classifications', list(''))

entry = None
if rec:
entry = (rec, metrics, collections)
entry = (rec, metrics, classifications)
return entry

def generate_boost_request_message(self, bibcode, run_id=None, output_path=None):
Expand Down Expand Up @@ -751,28 +749,79 @@ def generate_boost_request_message(self, bibcode, run_id=None, output_path=None)
# Create message for this record
message = self._populate_boost_request_from_record(rec, metrics, classifications,
run_id, output_path, None)
protobuf_format = BoostRequestRecord()

output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST')
output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST')
self.logger.debug('output_taskname: {}'.format(output_taskname))
self.logger.debug('output_broker: {}'.format(output_broker))
self.logger.debug('sending message {}'.format(message))

response_message = ParseDict(message, protobuf_format)

except Exception as e:
self.logger.error('Error retrieving record data for bibcode %s: %s', bibcode, e)
self.logger.error('Message content: %s', message)
raise

output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST')
output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST')
self.logger.debug('output_taskname: {}'.format(output_taskname))
self.logger.debug('output_broker: {}'.format(output_broker))
self.logger.debug('sending message {}'.format(message))

# Forward message to Boost Pipeline - Celery workers will handle the rest
try:
self.forward_message(message, pipeline='boost')
self.forward_message(response_message, pipeline='boost')
self.logger.info('Sent boost request for bibcode %s to Boost Pipeline', bibcode)
return True

except Exception as e:
self.logger.exception('Error sending boost request for bibcode %s: %s', bibcode, e)
return False

def generate_boost_request_message_batch(self, bib_data, metrics, classifications, run_id=None, output_path=None):
"""Build and send boost request message to Boost Pipeline.

Parameters
----------
bib_data : dictionary
The bib_data section of the record
metrics : dictionary
The metrics section of the record
classifications : list
The classifications section of the record
run_id : int, optional
Optional job/run identifier added to each entry.
output_path : str, optional
Optional output path hint added to each entry.

Returns
-------
bool
True if message was sent successfully, False otherwise.
"""
try:
# Create message for this record
message = self._populate_boost_request_from_record(bib_data, metrics, classifications,
run_id, output_path, None)
protobuf_format = BoostRequestRecord()

output_taskname=self._config.get('OUTPUT_TASKNAME_BOOST')
output_broker=self._config.get('OUTPUT_CELERY_BROKER_BOOST')
self.logger.debug('output_taskname: {}'.format(output_taskname))
self.logger.debug('output_broker: {}'.format(output_broker))
self.logger.debug('sending message {}'.format(message))

response_message = ParseDict(message, protobuf_format)

except Exception as e:
self.logger.error('Error creating boost request message: %s', e)
raise

# Forward message to Boost Pipeline - Celery workers will handle the rest
try:
self.forward_message(response_message, pipeline='boost')
self.logger.info('Sent boost request to Boost Pipeline')
return True

except Exception as e:
self.logger.exception('Error sending boost request: %s', e)
return False

def generate_links_for_resolver(self, record):
"""use nonbib or bib elements of database record and return links for resolver and checksum"""
# nonbib data has something like
Expand Down
25 changes: 24 additions & 1 deletion adsmp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import defaultdict
import pdb
from sqlalchemy.orm import load_only

import json

# ============================= INITIALIZATION ==================================== #

Expand All @@ -36,6 +36,7 @@
Queue('update-sitemap-files', app.exchange, routing_key='update-sitemap-files'),
Queue('update-scixid', app.exchange, routing_key='update-scixid'),
Queue('boost-request', app.exchange, routing_key='boost-request'),
Queue('boost-request-batch', app.exchange, routing_key='boost-request-batch'),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add new celery worker

)


Expand Down Expand Up @@ -1001,12 +1002,34 @@ def task_boost_request(bibcodes):
bibcodes = [bibcodes]

for bibcode in bibcodes:
logger.info('Processing boost request for bibcode: {}'.format(bibcode))
result = app.generate_boost_request_message(bibcode)

logger.info('Boost requests for %s bibcode(s) sent to boost pipeline', len(bibcodes))

return result

@app.task(queue='boost-request-batch')
def task_boost_request_batch(bib_data, metrics, classifications):
"""Process boost requests for a batch of records

@param bib_data: dictionary - the bib_data section of the record
@param metrics: dictionary - the metrics section of the record
@param classifications: list - the classifications section of the record
"""
results = []
logger.info('Processing boost request for batch of records')
for (bib, met, cl) in zip(bib_data, metrics, classifications):
bib = json.loads(bib) if isinstance(bib, str) else bib
met = json.loads(met) if isinstance(met, str) else met
cl = list(cl) if isinstance(cl, str) else cl
logger.info('Processing boost request for record: {}'.format(bib))
results.append(app.generate_boost_request_message_batch(bib, met, cl))

logger.info('Boost requests for %s records sent to boost pipeline', len(bib_data))
logger.info('Successfully sent %s boost requests to boost pipeline', sum(results))
return results


if __name__ == '__main__':
app.start()
14 changes: 9 additions & 5 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,27 +339,31 @@ def process_all_boost(batch_size):
with app.session_scope() as session:
# load all records from RecordsDB
for rec in session.query(Records) \
.options(load_only(Records.bibcode)) \
.options(load_only(Records.bib_data, Records.metrics, Records.classifications)) \
.yield_per(batch_size):

sent += 1
if sent % 1000 == 0:
logger.debug('Sending %s records', sent)

batch.append(rec.bibcode)
batch.append((rec.bib_data, rec.metrics, rec.classifications))
if len(batch) >= batch_size:
logger.info('Sending batch of %s records to Boost Pipeline', len(batch))
t = tasks.task_boost_request.delay(batch)
# Unpack the batch into separate lists for the task
bib_data_list, metrics_list, classifications_list = zip(*batch)
t = tasks.task_boost_request_batch.delay(bib_data_list, metrics_list, classifications_list)
_tasks.append(t)
batch = []

if len(batch) > 0:
logger.info('Sending final batch of %s records to Boost Pipeline', len(batch))
t = tasks.task_boost_request.delay(batch)
# Unpack the final batch into separate lists for the task
bib_data_list, metrics_list, classifications_list = zip(*batch)
t = tasks.task_boost_request_batch.delay(bib_data_list, metrics_list, classifications_list)
_tasks.append(t)
logger.debug('Sending %s records', len(batch))

logger.info('Completed process_all_boost, sent total of %s records', sent)
logger.info('Sent total of %s records to boost pipeline', sent)


def rebuild_collection(collection_name, batch_size):
Expand Down