Skip to content

Commit c82037f

Browse files
committed
Solr sync batching
1 parent 5ec00b9 commit c82037f

File tree

2 files changed

+130
-65
lines changed

2 files changed

+130
-65
lines changed

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ Changelog
1818
[sgeulette]
1919
- Used session from imio.esign.
2020
[sgeulette]
21+
- Solr sync batching.
22+
[chris-adam]
2123

2224
3.0 (2021-09-30)
2325
----------------

imio/dms/mail/browser/overrides.py

Lines changed: 128 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,6 @@ class DocsImportSecondStepView(ImportSecondStepView):
347347
# collective.solr maintenance view
348348

349349
try:
350-
from collective.solr.browser.maintenance import checkpointIterator
351-
from collective.solr.browser.maintenance import MAX_ROWS
352-
from collective.solr.browser.maintenance import notimeout
353350
from collective.solr.browser.maintenance import SolrMaintenanceView
354351
from collective.solr.browser.maintenance import timer
355352
from collective.solr.indexer import SolrIndexProcessor
@@ -358,7 +355,15 @@ class DocsImportSecondStepView(ImportSecondStepView):
358355
from collective.solr.parser import parse_date_as_datetime
359356
from collective.solr.parser import SolrResponse
360357
from collective.solr.parser import unmarshallers
361-
358+
from imio.helpers.batching import batch_delete_files
359+
from imio.helpers.batching import batch_get_keys
360+
from imio.helpers.batching import batch_globally_finished
361+
from imio.helpers.batching import batch_handle_key
362+
from imio.helpers.batching import batch_hashed_filename
363+
from imio.helpers.batching import batch_loop_else
364+
from imio.helpers.batching import batch_skip_key
365+
from imio.helpers.batching import can_delete_batch_files
366+
from Products.ZCatalog.ProgressHandler import ZLogHandler
362367
BaseMaintenanceView = SolrMaintenanceView
363368
except ImportError:
364369
BaseMaintenanceView = BrowserView
@@ -384,17 +389,20 @@ def sync(self, batch=1000, preImportDeleteQuery="*:*"):
384389
proc = SolrIndexProcessor(manager)
385390
conn = manager.getConnection()
386391
key = queryUtility(ISolrConnectionManager).getSchema().uniqueKey
387-
zodb_conn = self.context._p_jar
388392
catalog = getToolByName(self.context, "portal_catalog")
389393
getIndex = catalog._catalog.getIndex
390394
modified_index = getIndex("modified")
391395
uid_index = getIndex(key)
392396
log = self.mklog()
393397
real = timer() # real time
394-
lap = timer() # real lap time (for intermediate commits)
395398
cpu = timer(clock) # cpu time
399+
396400
# get Solr status
397-
response = conn.search(q=preImportDeleteQuery, rows=MAX_ROWS, fl="%s modified" % key)
401+
response = conn.search(
402+
q=preImportDeleteQuery,
403+
rows=10000000,
404+
fl="%s modified" % key,
405+
)
398406
# avoid creating DateTime instances
399407
simple_unmarshallers = unmarshallers.copy()
400408
simple_unmarshallers["date"] = parse_date_as_datetime
@@ -411,36 +419,23 @@ def _utc_convert(value):
411419
uid = flare[key]
412420
solr_uids.add(uid)
413421
solr_results[uid] = _utc_convert(flare["modified"])
422+
414423
# get catalog status
415424
cat_results = {}
416425
cat_uids = set()
417426
for uid, rid in uid_index._index.items():
418427
cat_uids.add(uid)
419428
cat_results[uid] = rid
429+
420430
# differences
421431
index = cat_uids.difference(solr_uids)
422-
solr_uids.difference_update(cat_uids)
423-
unindex = solr_uids
424-
processed = 0
425-
flush = notimeout(lambda: conn.flush())
426-
427-
def checkPoint():
428-
msg = "intermediate commit (%d items processed, " "last batch in %s)...\n" % (processed, lap.next())
429-
log(msg)
430-
logger.info(msg)
431-
flush()
432-
zodb_conn.cacheGC()
433-
if self.batch_value and processed >= self.batch_value:
434-
logger.info("EXITED following BATCH env value {}".format(self.batch_value))
435-
conn.commit()
436-
sys.exit(0)
437-
438-
cpi = checkpointIterator(checkPoint, batch)
432+
unindex = solr_uids.difference(cat_uids)
433+
self._processed = 0
434+
439435
# Look up objects
440436
uid_rid_get = cat_results.get
441437
rid_path_get = catalog._catalog.paths.get
442438
catalog_traverse = catalog.unrestrictedTraverse
443-
444439
def lookup(
445440
uid, rid=None, uid_rid_get=uid_rid_get, rid_path_get=rid_path_get, catalog_traverse=catalog_traverse
446441
):
@@ -459,51 +454,119 @@ def lookup(
459454
return None
460455
return obj
461456

462-
log('processing %d "unindex" operations next...\n' % len(unindex))
463-
op = notimeout(lambda uid: conn.delete(id=uid))
464-
for uid in unindex:
465-
obj = lookup(uid)
466-
if obj is None:
467-
op(uid)
468-
processed += 1
469-
cpi.next()
470-
else:
471-
log("not unindexing existing object %r.\n" % uid)
472-
log('processing %d "index" operations next...\n' % len(index))
473-
op = notimeout(lambda obj: proc.index(obj))
474-
for uid in index:
475-
obj = lookup(uid)
476-
if ICheckIndexable(obj)():
477-
op(obj)
478-
processed += 1
479-
cpi.next()
457+
# Unindex items in Solr but not in Plone catalog
458+
def batch_unindex(unindex):
459+
pghandler = ZLogHandler(steps=batch)
460+
i = 0
461+
pghandler.init('sync', len(unindex))
462+
pklfile = batch_hashed_filename('collective.solr.sync.unindex.pkl')
463+
batch_keys, batch_config = batch_get_keys(pklfile, loop_length=len(unindex))
464+
for uid in unindex:
465+
if batch_skip_key(uid, batch_keys, batch_config):
466+
continue
467+
i += 1
468+
if pghandler:
469+
pghandler.report(i)
470+
obj = lookup(uid)
471+
if obj is None:
472+
conn.delete(id=uid)
473+
self._processed += 1
474+
else:
475+
log("not unindexing existing object %r.\n" % uid)
476+
if batch_handle_key(uid, batch_keys, batch_config):
477+
break
480478
else:
481-
log("not indexing unindexable object %r.\n" % uid)
482-
if obj is not None:
483-
obj._p_deactivate()
484-
log('processing "reindex" operations next...\n')
485-
op = notimeout(lambda obj: proc.reindex(obj))
486-
cat_mod_get = modified_index._unindex.get
487-
solr_mod_get = solr_results.get
488-
done = unindex.union(index)
489-
for uid, rid in cat_results.items():
490-
if uid in done:
491-
continue
492-
if isinstance(rid, IITreeSet):
493-
rid = rid.keys()[0]
494-
if cat_mod_get(rid) != solr_mod_get(uid):
495-
obj = lookup(uid, rid=rid)
479+
batch_loop_else(batch_keys, batch_config)
480+
conn.commit()
481+
if can_delete_batch_files(batch_keys, batch_config):
482+
batch_delete_files(batch_keys, batch_config)
483+
if pghandler:
484+
pghandler.finish()
485+
return batch_globally_finished(batch_keys, batch_config)
486+
487+
log('processing %d "unindex" operations next...\n' % len(unindex))
488+
finished_unindex = batch_unindex(unindex)
489+
490+
# Index items in Plone catalog but not in Solr
491+
def batch_index(index):
492+
pghandler = ZLogHandler(steps=batch)
493+
i = 0
494+
pghandler.init('sync', len(index))
495+
pklfile = batch_hashed_filename('collective.solr.sync.index.pkl')
496+
batch_keys, batch_config = batch_get_keys(pklfile, loop_length=len(index))
497+
for uid in index:
498+
if batch_skip_key(uid, batch_keys, batch_config):
499+
continue
500+
i += 1
501+
if pghandler:
502+
pghandler.report(i)
503+
obj = lookup(uid)
496504
if ICheckIndexable(obj)():
497-
op(obj)
498-
processed += 1
499-
cpi.next()
505+
proc.index(obj)
506+
self._processed += 1
500507
else:
501-
log("not reindexing unindexable object %r.\n" % uid)
508+
log("not indexing unindexable object %r.\n" % uid)
502509
if obj is not None:
503510
obj._p_deactivate()
504-
conn.commit()
511+
if batch_handle_key(uid, batch_keys, batch_config):
512+
break
513+
else:
514+
batch_loop_else(batch_keys, batch_config)
515+
conn.commit()
516+
if can_delete_batch_files(batch_keys, batch_config):
517+
batch_delete_files(batch_keys, batch_config)
518+
if pghandler:
519+
pghandler.finish()
520+
return batch_globally_finished(batch_keys, batch_config)
521+
522+
finished_index = False
523+
if finished_unindex:
524+
log('processing %d "index" operations next...\n' % len(index))
525+
finished_index = batch_index(index)
526+
527+
# Reindex items modified in Plone catalog since last indexing in Solr
528+
def batch_reindex(reindex):
529+
pghandler = ZLogHandler(steps=batch)
530+
i = 0
531+
pghandler.init('sync', len(reindex))
532+
pklfile = batch_hashed_filename('collective.solr.sync.reindex.pkl')
533+
batch_keys, batch_config = batch_get_keys(pklfile, loop_length=len(reindex))
534+
for uid, rid in reindex.items():
535+
if batch_skip_key(uid, batch_keys, batch_config):
536+
continue
537+
i += 1
538+
if pghandler:
539+
pghandler.report(i)
540+
if isinstance(rid, IITreeSet):
541+
rid = rid.keys()[0]
542+
if modified_index._unindex.get(rid) != solr_results.get(uid):
543+
obj = lookup(uid, rid=rid)
544+
if ICheckIndexable(obj)():
545+
proc.reindex(obj)
546+
self._processed += 1
547+
else:
548+
log("not reindexing unindexable object %r.\n" % uid)
549+
if obj is not None:
550+
obj._p_deactivate()
551+
if batch_handle_key(uid, batch_keys, batch_config):
552+
break
553+
else:
554+
batch_loop_else(batch_keys, batch_config)
555+
conn.commit()
556+
if can_delete_batch_files(batch_keys, batch_config):
557+
batch_delete_files(batch_keys, batch_config)
558+
if pghandler:
559+
pghandler.finish()
560+
return batch_globally_finished(batch_keys, batch_config)
561+
562+
if finished_index:
563+
log('processing "reindex" operations next...\n')
564+
done = unindex.union(index)
565+
cat_results = {uid: rid for uid, rid in cat_results.items() if uid not in done}
566+
batch_reindex(cat_results)
567+
505568
log("solr index synced.\n")
506-
msg = "processed %d object(s) in %s (%s cpu time)."
507-
msg = msg % (processed, real.next(), cpu.next())
569+
msg = "self._processed %d object(s) in %s (%s cpu time)."
570+
msg = msg % (self._processed, real.next(), cpu.next())
508571
log(msg)
509572
logger.info(msg)

0 commit comments

Comments
 (0)