Skip to content

Commit c2a0c75

Browse files
author
Nic Watson
authored
Rejigger the the cell backup so it blocks the ioloop less (#2145)
1 parent 96521d9 commit c2a0c75

File tree

3 files changed

+43
-50
lines changed

3 files changed

+43
-50
lines changed

synapse/lib/cell.py

+35-42
Original file line numberDiff line numberDiff line change
@@ -702,8 +702,7 @@ class Cell(s_nexus.Pusher, s_telepath.Aware):
702702
}
703703
}
704704

705-
BACKUP_SPAWN_TIMEOUT = 30.0
706-
BACKUP_ACQUIRE_TIMEOUT = 0.5
705+
BACKUP_SPAWN_TIMEOUT = 60.0
707706

708707
async def __anit__(self, dirn, conf=None, readonly=False):
709708

@@ -1202,75 +1201,69 @@ async def _execBackupTask(self, dirn):
12021201
mypipe, child_pipe = ctx.Pipe()
12031202
paths = [str(slab.path) for slab in slabs]
12041203
loglevel = logger.getEffectiveLevel()
1204+
proc = None
12051205

1206-
def spawnproc():
1207-
logger.debug('Starting multiprocessing target')
1208-
proc = ctx.Process(target=self._backupProc, args=(child_pipe, self.dirn, dirn, paths, loglevel))
1209-
proc.start()
1210-
hasdata = mypipe.poll(timeout=self.BACKUP_SPAWN_TIMEOUT)
1211-
if not hasdata:
1212-
raise s_exc.SynErr(mesg='backup subprocess stuck starting')
1213-
data = mypipe.recv()
1214-
assert data == 'ready'
1215-
return proc
1206+
try:
12161207

1217-
proc = await s_coro.executor(spawnproc)
1208+
async with self.nexsroot.applylock:
12181209

1219-
logger.debug('Syncing LMDB Slabs')
1220-
while True:
1221-
await s_lmdbslab.Slab.syncLoopOnce()
1222-
if not any(slab.dirty for slab in slabs):
1223-
break
1210+
logger.debug('Syncing LMDB Slabs')
12241211

1225-
try:
1212+
while True:
1213+
await s_lmdbslab.Slab.syncLoopOnce()
1214+
if not any(slab.dirty for slab in slabs):
1215+
break
12261216

1227-
logger.debug('Acquiring LMDB txns')
1228-
mypipe.send('proceed')
1217+
logger.debug('Starting backup process')
12291218

1230-
# This is technically pending the ioloop waiting for the backup process to acquire a bunch of
1231-
# transactions. We're effectively locking out new write requests the brute force way.
1232-
hasdata = mypipe.poll(timeout=self.BACKUP_ACQUIRE_TIMEOUT)
1233-
if not hasdata:
1234-
raise s_exc.SynErr(mesg='backup subprocess stuck acquiring LMDB txns')
1219+
args = (child_pipe, self.dirn, dirn, paths, loglevel)
12351220

1236-
data = mypipe.recv()
1237-
assert data == 'captured'
1221+
def waitforproc1():
1222+
nonlocal proc
1223+
proc = ctx.Process(target=self._backupProc, args=args)
1224+
proc.start()
1225+
hasdata = mypipe.poll(timeout=self.BACKUP_SPAWN_TIMEOUT)
1226+
if not hasdata:
1227+
raise s_exc.SynErr(mesg='backup subprocess start timed out')
1228+
data = mypipe.recv()
1229+
assert data == 'captured'
12381230

1239-
logger.debug('Acquired LMDB txns')
1231+
await s_coro.executor(waitforproc1)
12401232

1241-
def waitforproc():
1233+
def waitforproc2():
12421234
proc.join()
12431235
if proc.exitcode:
12441236
raise s_exc.SpawnExit(code=proc.exitcode)
12451237

1246-
retn = await s_coro.executor(waitforproc)
1238+
await s_coro.executor(waitforproc2)
1239+
proc = None
1240+
1241+
logger.info(f'Backup completed to [{dirn}]')
1242+
return
12471243

12481244
except (asyncio.CancelledError, Exception):
12491245
logger.exception(f'Error performing backup to [{dirn}]')
1250-
proc.terminate()
12511246
raise
12521247

1253-
else:
1254-
logger.info(f'Backup completed to [{dirn}]')
1255-
return retn
1248+
finally:
1249+
if proc:
1250+
proc.terminate()
12561251

12571252
@staticmethod
12581253
def _backupProc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
12591254
'''
12601255
(In a separate process) Actually do the backup
12611256
'''
1262-
# This logging call is okay to run since we're executing in
1263-
# our own process space and no logging has been configured.
1257+
# This is a new process: configure logging
12641258
s_common.setlogging(logger, loglevel)
1265-
pipe.send('ready')
1266-
data = pipe.recv()
1267-
assert data == 'proceed'
1259+
12681260
with s_t_backup.capturelmdbs(srcdir, onlydirs=lmdbpaths) as lmdbinfo:
1269-
# Let parent know we have the transactions so he can resume the ioloop
12701261
pipe.send('captured')
1271-
1262+
logger.debug('Acquired LMDB transactions')
12721263
s_t_backup.txnbackup(lmdbinfo, srcdir, dstdir)
12731264

1265+
logger.debug('Backup process completed')
1266+
12741267
def _reqBackConf(self):
12751268
if self.backdirn is None:
12761269
mesg = 'Backup APIs require the backup:dir config option is set'

synapse/lib/nexus.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ async def __anit__(self, dirn: str, donexslog: bool = True, map_async=False): #
8383
self.started = False
8484
self.celliden = None
8585
self.donexslog = donexslog
86+
self.applylock = asyncio.Lock()
8687

8788
self._mirrors: List[ChangeDist] = []
8889
self._nexskids: Dict[str, 'Pusher'] = {}
@@ -241,10 +242,12 @@ async def _apply(self, indx, mesg):
241242

242243
nexus = self._nexskids[nexsiden]
243244
func, passitem = nexus._nexshands[event]
244-
if passitem:
245-
return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs)
246245

247-
return await func(nexus, *args, **kwargs)
246+
async with self.applylock:
247+
if passitem:
248+
return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs)
249+
250+
return await func(nexus, *args, **kwargs)
248251

249252
async def iter(self, offs: int) -> AsyncIterator[Any]:
250253
'''

synapse/tests/test_lib_cell.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@ def _sleeperProc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
2222
time.sleep(3.0)
2323

2424
def _sleeper2Proc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
25-
pipe.send('ready')
2625
time.sleep(2.0)
2726

2827
def _exiterProc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
29-
pipe.send('ready')
3028
pipe.send('captured')
3129
sys.exit(1)
3230

@@ -664,9 +662,8 @@ async def test_cell_backup(self):
664662
# Test runners can take an unusually long time to spawn a process
665663
with mock.patch.object(s_cell.Cell, 'BACKUP_SPAWN_TIMEOUT', 8.0):
666664

667-
with mock.patch.object(s_cell.Cell, 'BACKUP_ACQUIRE_TIMEOUT', 0.1):
668-
with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_sleeper2Proc)):
669-
await self.asyncraises(s_exc.SynErr, proxy.runBackup())
665+
with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_sleeper2Proc)):
666+
await self.asyncraises(s_exc.SynErr, proxy.runBackup())
670667

671668
with mock.patch.object(s_cell.Cell, '_backupProc', staticmethod(_exiterProc)):
672669
await self.asyncraises(s_exc.SpawnExit, proxy.runBackup())

0 commit comments

Comments
 (0)