diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 6a92a3b738c..18f5b276120 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -702,8 +702,7 @@ class Cell(s_nexus.Pusher, s_telepath.Aware): } } - BACKUP_SPAWN_TIMEOUT = 30.0 - BACKUP_ACQUIRE_TIMEOUT = 0.5 + BACKUP_SPAWN_TIMEOUT = 60.0 async def __anit__(self, dirn, conf=None, readonly=False): @@ -1202,75 +1201,69 @@ async def _execBackupTask(self, dirn): mypipe, child_pipe = ctx.Pipe() paths = [str(slab.path) for slab in slabs] loglevel = logger.getEffectiveLevel() + proc = None - def spawnproc(): - logger.debug('Starting multiprocessing target') - proc = ctx.Process(target=self._backupProc, args=(child_pipe, self.dirn, dirn, paths, loglevel)) - proc.start() - hasdata = mypipe.poll(timeout=self.BACKUP_SPAWN_TIMEOUT) - if not hasdata: - raise s_exc.SynErr(mesg='backup subprocess stuck starting') - data = mypipe.recv() - assert data == 'ready' - return proc + try: - proc = await s_coro.executor(spawnproc) + async with self.nexsroot.applylock: - logger.debug('Syncing LMDB Slabs') - while True: - await s_lmdbslab.Slab.syncLoopOnce() - if not any(slab.dirty for slab in slabs): - break + logger.debug('Syncing LMDB Slabs') - try: + while True: + await s_lmdbslab.Slab.syncLoopOnce() + if not any(slab.dirty for slab in slabs): + break - logger.debug('Acquiring LMDB txns') - mypipe.send('proceed') + logger.debug('Starting backup process') - # This is technically pending the ioloop waiting for the backup process to acquire a bunch of - # transactions. We're effectively locking out new write requests the brute force way. - hasdata = mypipe.poll(timeout=self.BACKUP_ACQUIRE_TIMEOUT) - if not hasdata: - raise s_exc.SynErr(mesg='backup subprocess stuck acquiring LMDB txns') + args = (child_pipe, self.dirn, dirn, paths, loglevel) - data = mypipe.recv() - assert data == 'captured' + def waitforproc1(): + nonlocal proc + proc = ctx.Process(target=self._backupProc, args=args) + proc.start() + hasdata = mypipe.poll(timeout=self.BACKUP_SPAWN_TIMEOUT) + if not hasdata: + raise s_exc.SynErr(mesg='backup subprocess start timed out') + data = mypipe.recv() + assert data == 'captured' - logger.debug('Acquired LMDB txns') + await s_coro.executor(waitforproc1) - def waitforproc(): + def waitforproc2(): proc.join() if proc.exitcode: raise s_exc.SpawnExit(code=proc.exitcode) - retn = await s_coro.executor(waitforproc) + await s_coro.executor(waitforproc2) + proc = None + + logger.info(f'Backup completed to [{dirn}]') + return except (asyncio.CancelledError, Exception): logger.exception(f'Error performing backup to [{dirn}]') - proc.terminate() raise - else: - logger.info(f'Backup completed to [{dirn}]') - return retn + finally: + if proc: + proc.terminate() @staticmethod def _backupProc(pipe, srcdir, dstdir, lmdbpaths, loglevel): ''' (In a separate process) Actually do the backup ''' - # This logging call is okay to run since we're executing in - # our own process space and no logging has been configured. + # This is a new process: configure logging s_common.setlogging(logger, loglevel) - pipe.send('ready') - data = pipe.recv() - assert data == 'proceed' + with s_t_backup.capturelmdbs(srcdir, onlydirs=lmdbpaths) as lmdbinfo: - # Let parent know we have the transactions so he can resume the ioloop pipe.send('captured') - + logger.debug('Acquired LMDB transactions') s_t_backup.txnbackup(lmdbinfo, srcdir, dstdir) + logger.debug('Backup process completed') + def _reqBackConf(self): if self.backdirn is None: mesg = 'Backup APIs require the backup:dir config option is set' diff --git a/synapse/lib/nexus.py b/synapse/lib/nexus.py index 0267f8f038d..6b7945300db 100644 --- a/synapse/lib/nexus.py +++ b/synapse/lib/nexus.py @@ -83,6 +83,7 @@ async def __anit__(self, dirn: str, donexslog: bool = True, map_async=False): # self.started = False self.celliden = None self.donexslog = donexslog + self.applylock = asyncio.Lock() self._mirrors: List[ChangeDist] = [] self._nexskids: Dict[str, 'Pusher'] = {} @@ -241,10 +242,12 @@ async def _apply(self, indx, mesg): nexus = self._nexskids[nexsiden] func, passitem = nexus._nexshands[event] - if passitem: - return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs) - return await func(nexus, *args, **kwargs) + async with self.applylock: + if passitem: + return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs) + + return await func(nexus, *args, **kwargs) async def iter(self, offs: int) -> AsyncIterator[Any]: ''' diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index 2bb0b450479..26b9801aac5 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -22,11 +22,9 @@ def _sleeperProc(pipe, srcdir, dstdir, lmdbpaths, loglevel): time.sleep(3.0) def _sleeper2Proc(pipe, srcdir, dstdir, lmdbpaths, loglevel): - pipe.send('ready') time.sleep(2.0) def _exiterProc(pipe, srcdir, dstdir, lmdbpaths, loglevel): - pipe.send('ready') pipe.send('captured') sys.exit(1) @@ -664,9 +662,8 @@ async def test_cell_backup(self): # Test runners can take an unusually long time to spawn a process with mock.patch.object(s_cell.Cell, 'BACKUP_SPAWN_TIMEOUT', 8.0): - with mock.patch.object(s_cell.Cell, 'BACKUP_ACQUIRE_TIMEOUT', 0.1): - with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_sleeper2Proc)): - await self.asyncraises(s_exc.SynErr, proxy.runBackup()) + with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_sleeper2Proc)): + await self.asyncraises(s_exc.SynErr, proxy.runBackup()) with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_exiterProc)): await self.asyncraises(s_exc.SpawnExit, proxy.runBackup())