Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/experimaestro/connectors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import os
import threading
from experimaestro.launcherfinder import LauncherRegistry
import fasteners
from fasteners import InterProcessLock as FastenersInterProcessLock
import psutil

from experimaestro.locking import Lock
from asyncio import Lock

from . import (
Connector,
Expand Down Expand Up @@ -93,7 +93,7 @@ def fromspec(connector, spec):


def getstream(redirect: Redirect, write: bool):
if redirect.type == RedirectType.FILE:
if redirect.type == RedirectType.FILE and redirect.path:
return redirect.path.open("w" if write else "r")

if redirect.type == RedirectType.PIPE:
Expand Down Expand Up @@ -145,25 +145,35 @@ def start(self, task_mode=False):
return process


class InterProcessLock(fasteners.InterProcessLock, Lock):
class InterProcessLock(FastenersInterProcessLock, Lock):
def __init__(self, path, max_delay=-1):
super().__init__(path)
FastenersInterProcessLock.__init__(self, path)
self.max_delay = max_delay

def __enter__(self):
logger.debug("Locking %s", self.path)
if not super().acquire(blocking=True, max_delay=self.max_delay, timeout=None):
if not FastenersInterProcessLock.acquire(
self, blocking=True, max_delay=self.max_delay, timeout=None
):
raise threading.ThreadError("Could not acquire lock")
logger.debug("Locked %s", self.path)
return self

def __aenter__(self):
# use the synchronous __enter__ method in async context
return self.__enter__()

def __exit__(self, *args):
logger.debug("Unlocking %s", self.path)
super().__exit__(*args)

def __aexit__(self, *args):
# use the synchronous __exit__ method in async context
return self.__exit__(*args)


class LocalConnector(Connector):
INSTANCE: Connector = None
INSTANCE: Optional[Connector] = None

@staticmethod
def instance():
Expand All @@ -175,7 +185,7 @@ def instance():
def init_registry(registry: LauncherRegistry):
pass

def __init__(self, localpath: Path = None):
def __init__(self, localpath: Optional[Path] = None):
localpath = localpath
if not localpath:
localpath = Path(
Expand All @@ -200,7 +210,7 @@ def createtoken(self, name: str, total: int) -> Token:
def processbuilder(self) -> ProcessBuilder:
return LocalProcessBuilder()

def resolve(self, path: Path, basepath: Path = None) -> str:
def resolve(self, path: Path, basepath: Optional[Path] = None) -> str:
assert isinstance(path, PosixPath) or isinstance(
path, WindowsPath
), f"Unrecognized path {type(path)}"
Expand Down
2 changes: 1 addition & 1 deletion src/experimaestro/connectors/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io
import os
import re
from asyncio import Lock
from experimaestro.launcherfinder import LauncherRegistry
from urllib.parse import urlparse
from itertools import chain
Expand All @@ -19,7 +20,6 @@
RedirectType,
Redirect,
)
from experimaestro.locking import Lock
from experimaestro.tokens import Token

try:
Expand Down
76 changes: 25 additions & 51 deletions src/experimaestro/locking.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,42 @@
from experimaestro.utils.asyncio import asyncThreadcheck
from asyncio import Lock
from .utils import logger


class Lock:
"""A lock"""

def __init__(self):
self._level = 0
self.detached = False

def detach(self):
self.detached = True

def acquire(self):
if self._level == 0:
self._level += 1
self._acquire()
return self

def release(self):
if not self.detached and self._level == 1:
self._level -= 1
self._release()

def __enter__(self):
self.acquire()
return self

def __exit__(self, *args):
self.release()

async def __aenter__(self):
return await asyncThreadcheck("lock (aenter)", self.__enter__)

async def __aexit__(self, *args):
return await asyncThreadcheck("lock (aexit)", self.__exit__, *args)

def _acquire(self):
raise NotImplementedError()

def _release(self):
raise NotImplementedError()


class LockError(Exception):
pass


class Locks(Lock):
"""A set of locks"""
"""A set of locks that can be acquired/released together"""

def __init__(self):
super().__init__()
self.locks = []

def append(self, lock):
"""Add a lock to the collection"""
self.locks.append(lock)

def _acquire(self):
for lock in self.locks:
lock.acquire()
async def acquire(self):
"""Acquire all locks in order"""
if not self.locked():
for lock in self.locks:
await lock.acquire()
self._acquired = True
await super().acquire()
return self

def release(self):
"""Release all locks in reverse order"""
if self.locked():
# if not self.detached and self._acquired:
logger.debug("Releasing %d locks", len(self.locks))
# Release in reverse order to prevent deadlocks
for lock in reversed(self.locks):
logger.debug("[locks] Releasing %s", lock)
lock.release()
super().release()

def _release(self):
logger.debug("Releasing %d locks", len(self.locks))
for lock in self.locks:
logger.debug("[locks] Releasing %s", lock)
lock.release()
async def __aenter__(self):
await super().__aenter__()
return self
4 changes: 2 additions & 2 deletions src/experimaestro/scheduler/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncio
from enum import Enum
from ..utils import logger
from ..locking import Lock


if TYPE_CHECKING:
from . import Job
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(self, origin):
def status(self) -> DependencyStatus:
raise NotImplementedError()

def lock(self) -> Lock:
def lock(self) -> asyncio.Lock:
raise NotImplementedError()

def __repr__(self) -> str:
Expand Down
21 changes: 16 additions & 5 deletions src/experimaestro/scheduler/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,18 @@ def __init__(self, job):
super().__init__()
self.job = job

def _acquire(self):
# def _acquire(self):
# return self.job.state == JobState.DONE

async def acquire(self):
await super().acquire()
return self.job.state == JobState.DONE

def _release(self):
# def _release(self):
# return False

def release(self):
super().release()
return False


Expand Down Expand Up @@ -299,9 +307,9 @@ async def aio_start(self, sched_dependency_lock, notification_server=None):
# We first lock the job before proceeding
assert self.launcher is not None

with Locks() as locks:
async with Locks() as locks:
logger.debug("[starting] Locking job %s", self)
async with self.launcher.connector.lock(self.lockpath):
with self.launcher.connector.lock(self.lockpath):
logger.debug("[starting] Locked job %s", self)

state = None
Expand All @@ -317,7 +325,10 @@ async def aio_start(self, sched_dependency_lock, notification_server=None):
async with sched_dependency_lock:
for dependency in self.dependencies:
try:
locks.append(dependency.lock().acquire())
lock = dependency.lock()
await lock.acquire()
# locks.append(dependency.lock().acquire())
locks.append(lock)
except LockError:
logger.warning(
"Could not lock %s, aborting start for job %s",
Expand Down
11 changes: 10 additions & 1 deletion src/experimaestro/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import threading
import os.path
from watchdog.events import FileSystemEventHandler
from asyncio import Lock
from typing import Dict
from experimaestro.launcherfinder.base import TokenConfiguration

from experimaestro.launcherfinder.registry import LauncherRegistry

from .ipc import ipcom
from .locking import Lock, LockError
from .locking import LockError
from .scheduler.dependencies import Dependency, DependencyStatus, Resource
import logging
import json
Expand Down Expand Up @@ -51,9 +52,17 @@ def __init__(self, dependency: "CounterTokenDependency"):
def _acquire(self):
self.dependency.token.acquire(self.dependency)

async def acquire(self):
self._acquire()
return await super().acquire()

def _release(self):
self.dependency.token.release(self.dependency)

def release(self):
self._release()
return super().release()

def __str__(self):
return "Lock(%s)" % self.dependency

Expand Down