Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rejigger the the cell backup so it blocks the ioloop less #2145

Merged
merged 5 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 35 additions & 42 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,7 @@ class Cell(s_nexus.Pusher, s_telepath.Aware):
}
}

BACKUP_SPAWN_TIMEOUT = 30.0
BACKUP_ACQUIRE_TIMEOUT = 0.5
BACKUP_SPAWN_TIMEOUT = 60.0

async def __anit__(self, dirn, conf=None, readonly=False):

Expand Down Expand Up @@ -1202,75 +1201,69 @@ async def _execBackupTask(self, dirn):
mypipe, child_pipe = ctx.Pipe()
paths = [str(slab.path) for slab in slabs]
loglevel = logger.getEffectiveLevel()
proc = None

def spawnproc():
logger.debug('Starting multiprocessing target')
proc = ctx.Process(target=self._backupProc, args=(child_pipe, self.dirn, dirn, paths, loglevel))
proc.start()
hasdata = mypipe.poll(timeout=self.BACKUP_SPAWN_TIMEOUT)
if not hasdata:
raise s_exc.SynErr(mesg='backup subprocess stuck starting')
data = mypipe.recv()
assert data == 'ready'
return proc
try:

proc = await s_coro.executor(spawnproc)
async with self.nexsroot.applylock:

logger.debug('Syncing LMDB Slabs')
while True:
await s_lmdbslab.Slab.syncLoopOnce()
if not any(slab.dirty for slab in slabs):
break
logger.debug('Syncing LMDB Slabs')

try:
while True:
await s_lmdbslab.Slab.syncLoopOnce()
if not any(slab.dirty for slab in slabs):
break

logger.debug('Acquiring LMDB txns')
mypipe.send('proceed')
logger.debug('Starting backup process')

# This is technically pending the ioloop waiting for the backup process to acquire a bunch of
# transactions. We're effectively locking out new write requests the brute force way.
hasdata = mypipe.poll(timeout=self.BACKUP_ACQUIRE_TIMEOUT)
if not hasdata:
raise s_exc.SynErr(mesg='backup subprocess stuck acquiring LMDB txns')
args = (child_pipe, self.dirn, dirn, paths, loglevel)

data = mypipe.recv()
assert data == 'captured'
def waitforproc1():
nonlocal proc
proc = ctx.Process(target=self._backupProc, args=args)
proc.start()
hasdata = mypipe.poll(timeout=self.BACKUP_SPAWN_TIMEOUT)
if not hasdata:
raise s_exc.SynErr(mesg='backup subprocess start timed out')
data = mypipe.recv()
assert data == 'captured'

logger.debug('Acquired LMDB txns')
await s_coro.executor(waitforproc1)

def waitforproc():
def waitforproc2():
proc.join()
if proc.exitcode:
raise s_exc.SpawnExit(code=proc.exitcode)

retn = await s_coro.executor(waitforproc)
await s_coro.executor(waitforproc2)
proc = None

logger.info(f'Backup completed to [{dirn}]')
return

except (asyncio.CancelledError, Exception):
logger.exception(f'Error performing backup to [{dirn}]')
proc.terminate()
raise

else:
logger.info(f'Backup completed to [{dirn}]')
return retn
finally:
if proc:
proc.terminate()

@staticmethod
def _backupProc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
'''
(In a separate process) Actually do the backup
'''
# This logging call is okay to run since we're executing in
# our own process space and no logging has been configured.
# This is a new process: configure logging
s_common.setlogging(logger, loglevel)
pipe.send('ready')
data = pipe.recv()
assert data == 'proceed'

with s_t_backup.capturelmdbs(srcdir, onlydirs=lmdbpaths) as lmdbinfo:
# Let parent know we have the transactions so he can resume the ioloop
pipe.send('captured')

logger.debug('Acquired LMDB transactions')
s_t_backup.txnbackup(lmdbinfo, srcdir, dstdir)

logger.debug('Backup process completed')

def _reqBackConf(self):
if self.backdirn is None:
mesg = 'Backup APIs require the backup:dir config option is set'
Expand Down
9 changes: 6 additions & 3 deletions synapse/lib/nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async def __anit__(self, dirn: str, donexslog: bool = True, map_async=False): #
self.started = False
self.celliden = None
self.donexslog = donexslog
self.applylock = asyncio.Lock()

self._mirrors: List[ChangeDist] = []
self._nexskids: Dict[str, 'Pusher'] = {}
Expand Down Expand Up @@ -241,10 +242,12 @@ async def _apply(self, indx, mesg):

nexus = self._nexskids[nexsiden]
func, passitem = nexus._nexshands[event]
if passitem:
return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs)

return await func(nexus, *args, **kwargs)
async with self.applylock:
if passitem:
return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs)

return await func(nexus, *args, **kwargs)

async def iter(self, offs: int) -> AsyncIterator[Any]:
'''
Expand Down
7 changes: 2 additions & 5 deletions synapse/tests/test_lib_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ def _sleeperProc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
time.sleep(3.0)

def _sleeper2Proc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
pipe.send('ready')
time.sleep(2.0)

def _exiterProc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
pipe.send('ready')
pipe.send('captured')
sys.exit(1)

Expand Down Expand Up @@ -664,9 +662,8 @@ async def test_cell_backup(self):
# Test runners can take an unusually long time to spawn a process
with mock.patch.object(s_cell.Cell, 'BACKUP_SPAWN_TIMEOUT', 8.0):

with mock.patch.object(s_cell.Cell, 'BACKUP_ACQUIRE_TIMEOUT', 0.1):
with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_sleeper2Proc)):
await self.asyncraises(s_exc.SynErr, proxy.runBackup())
with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_sleeper2Proc)):
await self.asyncraises(s_exc.SynErr, proxy.runBackup())

with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_exiterProc)):
await self.asyncraises(s_exc.SpawnExit, proxy.runBackup())
Expand Down