Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ repos:
rev: 'v3.19.1'
hooks:
- id: pyupgrade
args: ['--py38-plus']
args: ['--py310-plus']
- repo: https://github.com/PyCQA/flake8
rev: '7.2.0'
hooks:
Expand Down
9 changes: 5 additions & 4 deletions aiojobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

"""

from __future__ import annotations

import warnings
from typing import Optional

from ._job import Job
from ._scheduler import ExceptionHandler, Scheduler
Expand All @@ -16,10 +17,10 @@

async def create_scheduler(
*,
close_timeout: Optional[float] = 0.1,
limit: Optional[int] = 100,
close_timeout: float | None = 0.1,
limit: int | None = 100,
pending_limit: int = 10000,
exception_handler: Optional[ExceptionHandler] = None,
exception_handler: ExceptionHandler | None = None,
) -> Scheduler:
warnings.warn("Scheduler can now be instantiated directly.", DeprecationWarning)

Expand Down
24 changes: 13 additions & 11 deletions aiojobs/_job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import asyncio
import sys
import traceback
from collections.abc import Coroutine
from typing import TYPE_CHECKING, Generic, Optional, TypeVar
from typing import TYPE_CHECKING, Generic, TypeVar

if sys.version_info >= (3, 11):
from asyncio import timeout as asyncio_timeout
Expand Down Expand Up @@ -33,17 +35,17 @@ def __init__(
self,
coro: Coroutine[object, object, _T],
scheduler: Scheduler,
name: Optional[str] = None,
name: str | None = None,
):
self._coro = coro
self._scheduler: Optional[Scheduler] = scheduler
self._scheduler: Scheduler | None = scheduler
self._name = name
loop = asyncio.get_running_loop()
self._started = loop.create_future()

self._closed = False
self._explicit = False
self._task: Optional[asyncio.Task[_T]] = None
self._task: asyncio.Task[_T] | None = None

tb = traceback.extract_stack(sys._getframe(2)) if loop.get_debug() else None
self._source_traceback = tb
Expand Down Expand Up @@ -71,7 +73,7 @@ def pending(self) -> bool:
def closed(self) -> bool:
return self._closed

def get_name(self) -> Optional[str]:
def get_name(self) -> str | None:
"""Get the task name.

See https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.get_name.
Expand All @@ -84,14 +86,14 @@ def set_name(self, name: str) -> None:
if self._task is not None:
self._task.set_name(name)

async def _do_wait(self, timeout: Optional[float]) -> _T:
async def _do_wait(self, timeout: float | None) -> _T:
async with asyncio_timeout(timeout):
# TODO: add a test for waiting for a pending coro
await self._started
assert self._task is not None # Task should have been created before this.
return await self._task

async def _wait(self, *, timeout: Optional[float] = None) -> _T:
async def _wait(self, *, timeout: float | None = None) -> _T:
assert self._scheduler is not None # Only removed when not _closed.
scheduler = self._scheduler
try:
Expand All @@ -103,14 +105,14 @@ async def _wait(self, *, timeout: Optional[float] = None) -> _T:
await self._close(scheduler.close_timeout)
raise

async def wait(self, *, timeout: Optional[float] = None) -> _T:
async def wait(self, *, timeout: float | None = None) -> _T:
if self._closed:
assert self._task is not None # Task must have been created if closed.
return await self._task
self._explicit = True
return await self._wait(timeout=timeout)

async def close(self, *, timeout: Optional[float] = None) -> None:
async def close(self, *, timeout: float | None = None) -> None:
if self._closed:
return
self._explicit = True
Expand All @@ -119,7 +121,7 @@ async def close(self, *, timeout: Optional[float] = None) -> None:
timeout = self._scheduler.close_timeout
await self._close(timeout)

async def _close(self, timeout: Optional[float]) -> None:
async def _close(self, timeout: float | None) -> None:
self._closed = True
if self._task is None:
# the task is closed immediately without actual execution
Expand Down Expand Up @@ -159,7 +161,7 @@ def _start(self) -> None:
self._task.add_done_callback(self._done_callback)
self._started.set_result(None)

def _done_callback(self, task: "asyncio.Task[_T]") -> None:
def _done_callback(self, task: asyncio.Task[_T]) -> None:
assert self._scheduler is not None
scheduler = self._scheduler
scheduler._done(self)
Expand Down
56 changes: 25 additions & 31 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from __future__ import annotations

import asyncio
import sys
from collections.abc import Awaitable, Collection, Coroutine, Iterator
from collections.abc import Awaitable, Callable, Collection, Coroutine, Iterator
from contextlib import suppress
from types import TracebackType
from typing import (
Any,
Callable,
Dict,
Optional,
Set,
Type,
TypeVar,
Union,
)

from ._job import Job
Expand All @@ -25,8 +21,8 @@
Self = TypeVar("Self", bound="Scheduler")

_T = TypeVar("_T")
_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]]
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]
_FutureLike = asyncio.Future[_T] | Awaitable[_T]
ExceptionHandler = Callable[["Scheduler", dict[str, Any]], None]


class Scheduler(Collection[Job[object]]):
Expand All @@ -47,27 +43,25 @@ class Scheduler(Collection[Job[object]]):
def __init__(
self,
*,
close_timeout: Optional[float] = 0.1,
wait_timeout: Optional[float] = 60,
limit: Optional[int] = 100,
close_timeout: float | None = 0.1,
wait_timeout: float | None = 60,
limit: int | None = 100,
pending_limit: int = 10000,
exception_handler: Optional[ExceptionHandler] = None,
exception_handler: ExceptionHandler | None = None,
):
if exception_handler is not None and not callable(exception_handler):
raise TypeError(
f"A callable object or None is expected, got {exception_handler!r}"
)

self._jobs: Set[Job[object]] = set()
self._shields: Set[asyncio.Task[object]] = set()
self._jobs: set[Job[object]] = set()
self._shields: set[asyncio.Task[object]] = set()
self._close_timeout = close_timeout
self._wait_timeout = wait_timeout
self._limit = limit
self._exception_handler = exception_handler
self._failed_tasks: asyncio.Queue[Optional[asyncio.Task[object]]] = (
asyncio.Queue()
)
self._failed_task: Optional[asyncio.Task[None]] = None
self._failed_tasks: asyncio.Queue[asyncio.Task[object] | None] = asyncio.Queue()
self._failed_task: asyncio.Task[None] | None = None
if sys.version_info < (3, 10):
self._failed_task = asyncio.create_task(self._wait_failed())
self._pending: asyncio.Queue[Job[object]] = asyncio.Queue(maxsize=pending_limit)
Expand Down Expand Up @@ -96,22 +90,22 @@ async def __aenter__(self: Self) -> Self:

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.wait_and_close()

@property
def limit(self) -> Optional[int]:
def limit(self) -> int | None:
return self._limit

@property
def pending_limit(self) -> int:
return self._pending.maxsize

@property
def close_timeout(self) -> Optional[float]:
def close_timeout(self) -> float | None:
return self._close_timeout

@property
Expand All @@ -127,7 +121,7 @@ def closed(self) -> bool:
return self._closed

async def spawn(
self, coro: Coroutine[object, object, _T], name: Optional[str] = None
self, coro: Coroutine[object, object, _T], name: str | None = None
) -> Job[_T]:
if self._closed:
raise RuntimeError("Scheduling a new job after closing")
Expand All @@ -150,7 +144,7 @@ async def spawn(
self._jobs.add(job)
return job

def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
def shield(self, arg: _FutureLike[_T]) -> asyncio.Future[_T]:
inner = asyncio.ensure_future(arg)
if inner.done():
return inner
Expand All @@ -163,7 +157,7 @@ def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
loop = inner.get_loop()
outer = loop.create_future()

def _inner_done_callback(inner: "asyncio.Task[object]") -> None:
def _inner_done_callback(inner: asyncio.Task[object]) -> None:
if outer.cancelled():
if not inner.cancelled():
inner.exception()
Expand All @@ -178,15 +172,15 @@ def _inner_done_callback(inner: "asyncio.Task[object]") -> None:
else:
outer.set_result(inner.result())

def _outer_done_callback(outer: "asyncio.Future[object]") -> None:
def _outer_done_callback(outer: asyncio.Future[object]) -> None:
if not inner.done():
inner.remove_done_callback(_inner_done_callback)

inner.add_done_callback(_inner_done_callback)
outer.add_done_callback(_outer_done_callback)
return outer

async def wait_and_close(self, timeout: Optional[float] = None) -> None:
async def wait_and_close(self, timeout: float | None = None) -> None:
if timeout is None:
timeout = self._wait_timeout
with suppress(asyncio.TimeoutError):
Expand Down Expand Up @@ -225,14 +219,14 @@ async def close(self) -> None:
self._failed_tasks.put_nowait(None)
await self._failed_task

def call_exception_handler(self, context: Dict[str, Any]) -> None:
def call_exception_handler(self, context: dict[str, Any]) -> None:
if self._exception_handler is None:
asyncio.get_running_loop().call_exception_handler(context)
else:
self._exception_handler(self, context)

@property
def exception_handler(self) -> Optional[ExceptionHandler]:
def exception_handler(self) -> ExceptionHandler | None:
return self._exception_handler

def _done(self, job: Job[object]) -> None:
Expand Down
17 changes: 8 additions & 9 deletions aiojobs/aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from __future__ import annotations

import asyncio
from collections.abc import AsyncIterator, Awaitable, Coroutine
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
from functools import wraps
from typing import (
Any,
Callable,
Optional,
TypeVar,
Union,
)

from aiohttp import web
Expand All @@ -17,8 +16,8 @@
__all__ = ("setup", "spawn", "get_scheduler", "get_scheduler_from_app", "atomic")

_T = TypeVar("_T")
_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]]
_RequestView = TypeVar("_RequestView", bound=Union[web.Request, web.View])
_FutureLike = asyncio.Future[_T] | Awaitable[_T]
_RequestView = TypeVar("_RequestView", web.Request, web.View)


AIOJOBS_SCHEDULER = web.AppKey("AIOJOBS_SCHEDULER", Scheduler)
Expand All @@ -31,19 +30,19 @@ def get_scheduler(request: web.Request) -> Scheduler:
return scheduler


def get_scheduler_from_app(app: web.Application) -> Optional[Scheduler]:
def get_scheduler_from_app(app: web.Application) -> Scheduler | None:
return app.get(AIOJOBS_SCHEDULER)


def get_scheduler_from_request(request: web.Request) -> Optional[Scheduler]:
def get_scheduler_from_request(request: web.Request) -> Scheduler | None:
return request.config_dict.get(AIOJOBS_SCHEDULER)


async def spawn(request: web.Request, coro: Coroutine[object, object, _T]) -> Job[_T]:
return await get_scheduler(request).spawn(coro)


def shield(request: web.Request, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
def shield(request: web.Request, arg: _FutureLike[_T]) -> asyncio.Future[_T]:
return get_scheduler(request).shield(arg)


Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
from collections.abc import AsyncIterator, Awaitable
from typing import Any, Callable, Dict
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Any, Dict

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'Dict' is not used.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed

import pytest

from aiojobs import Scheduler

PARAMS: Dict[str, Any] = {
PARAMS: dict[str, Any] = {
"close_timeout": 1.0,
"limit": 100,
"pending_limit": 0,
Expand Down
3 changes: 1 addition & 2 deletions tests/test_aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
from collections.abc import Awaitable
from typing import Callable
from collections.abc import Awaitable, Callable

import pytest
from aiohttp import ClientSession, web
Expand Down
6 changes: 3 additions & 3 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import sys
from collections.abc import Awaitable
from typing import Callable, List, NoReturn
from collections.abc import Awaitable, Callable
from typing import List, NoReturn

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'List' is not used.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
from unittest import mock

import pytest
Expand Down Expand Up @@ -366,7 +366,7 @@
async def coro(fut: "asyncio.Future[object]") -> None:
await fut

futures: List[asyncio.Future[object]] = [asyncio.Future() for _ in range(3)]
futures: list[asyncio.Future[object]] = [asyncio.Future() for _ in range(3)]
jobs = []

async def spawn() -> None:
Expand Down
Loading