Skip to content

Unify RPS and Concurrent Scheduler Paths #233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/guidellm/request/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from guidellm.dataset import ColumnInputTypes, load_dataset
from guidellm.objects import StandardBaseModel
from guidellm.request.request import GenerationRequest
from guidellm.request.session import GenerativeRequestSession

__all__ = [
"GenerativeRequestLoader",
Expand All @@ -30,10 +31,10 @@ class RequestLoaderDescription(StandardBaseModel):

class RequestLoader(Iterable):
@abstractmethod
def __iter__(self): ...
def __iter__(self) -> Iterator: ...

@abstractmethod
def __len__(self): ...
def __len__(self) -> int: ...

@property
@abstractmethod
Expand Down Expand Up @@ -105,14 +106,14 @@ def __init__(
self.preserve_iter_state = iter_type == "infinite" # ensure no caching requests
self._preserved_iter = None

def __iter__(self) -> Iterator[GenerationRequest]:
def __iter__(self) -> Iterator[GenerativeRequestSession]:
scope_create_count = 0

while (dataset_iter := self._get_dataset_iter(scope_create_count)) is not None:
scope_create_count += 1

for item in dataset_iter:
yield self._create_request(item)
yield GenerativeRequestSession(self._create_request(item))

self._preserved_iter = None

Expand Down
52 changes: 52 additions & 0 deletions src/guidellm/request/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from abc import ABC, abstractmethod
from typing import Generic, TypeVar

from guidellm.backend.response import ResponseSummary
from guidellm.request.request import GenerationRequest

__all__ = ["GenerativeRequestSession", "RequestSession"]

# TODO: Replace with specific types that implement needed features
RequestT = TypeVar("RequestT")
ResponseT = TypeVar("ResponseT")


class RequestSession(ABC, Generic[RequestT, ResponseT]):
@abstractmethod
def __len__(self) -> int: ...

@abstractmethod
def get_next_request(self) -> RequestT: ...

@abstractmethod
def get_next_delay(self) -> float: ...

@abstractmethod
def push_response(self, response: ResponseT) -> None: ...

@property
@abstractmethod
def complete(self) -> bool: ...


# TODO: Implement multiturn support
class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]):
def __init__(self, request: GenerationRequest) -> None:
self.request = request
self._complete = False

def __len__(self) -> int:
return 1

def get_next_request(self) -> GenerationRequest:
return self.request

def get_next_delay(self) -> float:
return 0.0

def push_response(self, response: ResponseSummary) -> None: # noqa: ARG002
self._complete = True

@property
def complete(self) -> bool:
return self._complete
2 changes: 0 additions & 2 deletions src/guidellm/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
RequestsWorker,
ResolveStatus,
WorkerDescription,
WorkerProcessRequest,
WorkerProcessResult,
)

Expand All @@ -46,7 +45,6 @@
"SynchronousStrategy",
"ThroughputStrategy",
"WorkerDescription",
"WorkerProcessRequest",
"WorkerProcessResult",
"strategy_display_str",
]
31 changes: 31 additions & 0 deletions src/guidellm/scheduler/result.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from dataclasses import dataclass
from queue import Queue
from typing import (
Generic,
Literal,
Optional,
)

from guidellm.objects import StandardBaseModel
from guidellm.request.session import RequestSession
from guidellm.scheduler.strategy import SchedulingStrategy
from guidellm.scheduler.types import RequestT, ResponseT

__all__ = [
"MPQueues",
"SchedulerRequestInfo",
"SchedulerRequestResult",
"SchedulerResult",
"SchedulerRunInfo",
"WorkerProcessRequestTime",
"WorkerProcessResult",
]


Expand Down Expand Up @@ -135,3 +141,28 @@ class SchedulerRequestResult(
request: RequestT
request_info: SchedulerRequestInfo
response: Optional[ResponseT] = None


# TODO: Move dataclasses somewhere else


@dataclass
class WorkerProcessRequestTime:
start_time: float
timeout_time: float
queued_time: float


@dataclass
class WorkerProcessResult(Generic[RequestT, ResponseT]):
type_: Literal["request_scheduled", "request_start", "request_complete"]
request: RequestT
response: Optional[ResponseT]
info: SchedulerRequestInfo


@dataclass
class MPQueues(Generic[RequestT, ResponseT]):
requests: Queue[RequestSession[RequestT, ResponseT]]
times: Queue[WorkerProcessRequestTime]
responses: Queue[WorkerProcessResult[RequestT, ResponseT]]
127 changes: 63 additions & 64 deletions src/guidellm/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import math
import multiprocessing
import multiprocessing.queues
import time
from collections.abc import AsyncGenerator, Iterable, Iterator
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from queue import Empty as QueueEmpty
from queue import Queue
from threading import Event
from typing import (
Any,
Generic,
Expand All @@ -15,17 +17,22 @@
from loguru import logger

from guidellm.config import settings
from guidellm.request.session import RequestSession
from guidellm.scheduler.result import (
MPQueues,
SchedulerRequestResult,
SchedulerResult,
SchedulerRunInfo,
WorkerProcessRequestTime,
WorkerProcessResult,
)
from guidellm.scheduler.strategy import SchedulingStrategy
from guidellm.scheduler.types import RequestT, ResponseT
from guidellm.scheduler.types import (
RequestT,
ResponseT,
)
from guidellm.scheduler.worker import (
RequestsWorker,
WorkerProcessRequest,
WorkerProcessResult,
)

__all__ = ["Scheduler"]
Expand Down Expand Up @@ -114,13 +121,13 @@ async def run(
raise ValueError(f"Invalid max_duration: {max_duration}")

with (
multiprocessing.Manager() as manager,
Manager() as manager,
ProcessPoolExecutor(
max_workers=scheduling_strategy.processes_limit
) as executor,
):
requests_iter: Optional[Iterator[Any]] = None
futures, requests_queue, responses_queue = await self._start_processes(
futures, queues, stop_event = await self._start_processes(
manager, executor, scheduling_strategy
)
run_info, requests_iter, times_iter = self._run_setup(
Expand Down Expand Up @@ -149,13 +156,14 @@ async def run(
requests_iter = self._add_requests(
requests_iter,
times_iter,
requests_queue,
queues.requests,
queues.times,
run_info,
)
await asyncio.sleep(0) # enable requests to start

iter_result = self._check_result_ready(
responses_queue,
queues.responses,
run_info,
)
if iter_result is not None:
Expand All @@ -171,7 +179,7 @@ async def run(
run_info=run_info,
)

await self._stop_processes(futures, requests_queue)
await self._stop_processes(futures, stop_event)

async def _start_processes(
self,
Expand All @@ -180,14 +188,18 @@ async def _start_processes(
scheduling_strategy: SchedulingStrategy,
) -> tuple[
list[asyncio.Future],
multiprocessing.Queue,
multiprocessing.Queue,
MPQueues[RequestT, ResponseT],
Event,
]:
await self.worker.prepare_multiprocessing()
requests_queue = manager.Queue(
maxsize=scheduling_strategy.queued_requests_limit
queues: MPQueues[RequestT, ResponseT] = MPQueues(
requests=manager.Queue(
maxsize=scheduling_strategy.processing_requests_limit
),
times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit),
responses=manager.Queue(),
)
responses_queue = manager.Queue()
stop_event = manager.Event()

num_processes = min(
scheduling_strategy.processes_limit,
Expand All @@ -212,36 +224,21 @@ async def _start_processes(
futures = []
loop = asyncio.get_event_loop()
for id_, requests_limit in zip(process_ids, process_requests_limits):
if scheduling_strategy.processing_mode == "sync":
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_synchronous,
requests_queue,
responses_queue,
id_,
)
)
elif scheduling_strategy.processing_mode == "async":
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_asynchronous,
requests_queue,
responses_queue,
requests_limit,
id_,
)
)
else:
raise ValueError(
f"Invalid processing mode: {scheduling_strategy.processing_mode} "
f"for strategy: {scheduling_strategy}"
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_asynchronous,
queues,
stop_event,
False, # TODO: Make configurable
requests_limit,
id_,
)
)

await asyncio.sleep(0.1) # give time for processes to start

return futures, requests_queue, responses_queue
return futures, queues, stop_event

def _run_setup(
self,
Expand Down Expand Up @@ -284,7 +281,8 @@ def _add_requests(
self,
requests_iter: Optional[Iterator[Any]],
times_iter: Iterator[float],
requests_queue: multiprocessing.Queue,
requests_queue: Queue[RequestSession[RequestT, ResponseT]],
times_queue: Queue[WorkerProcessRequestTime],
run_info: SchedulerRunInfo,
) -> Optional[Iterator[Any]]:
if requests_iter is not None:
Expand All @@ -298,23 +296,24 @@ def _add_requests(
if run_info.created_requests >= run_info.end_number:
raise StopIteration

if (
request_time := next(times_iter)
) >= run_info.end_time or time.time() >= run_info.end_time:
raise StopIteration

request = next(requests_iter)
work_req: WorkerProcessRequest[RequestT] = WorkerProcessRequest(
request=request,
start_time=request_time,
timeout_time=run_info.end_time,
queued_time=time.time(),
)
requests_queue.put(work_req)

run_info.created_requests += 1
run_info.queued_requests += 1
added_count += 1
session = next(requests_iter)
requests_queue.put(session)
for _ in range(len(session)):
if (
request_time := next(times_iter)
) >= run_info.end_time or time.time() >= run_info.end_time:
raise StopIteration

work_req = WorkerProcessRequestTime(
start_time=request_time,
timeout_time=run_info.end_time,
queued_time=time.time(),
)
times_queue.put(work_req)

run_info.created_requests += 1
run_info.queued_requests += 1
added_count += 1
except StopIteration:
# we've reached the limit number, limit time, or exhausted the requests
# set to None to stop adding more and tell the loop no more requests
Expand All @@ -324,14 +323,14 @@ def _add_requests(

def _check_result_ready(
self,
responses_queue: multiprocessing.Queue,
responses_queue: Queue[WorkerProcessResult[RequestT, ResponseT]],
run_info: SchedulerRunInfo,
) -> Optional[SchedulerRequestResult[RequestT, ResponseT]]:
try:
process_response: WorkerProcessResult[RequestT, ResponseT] = (
responses_queue.get_nowait()
)
except multiprocessing.queues.Empty: # type: ignore[attr-defined]
except QueueEmpty:
return None

if process_response.type_ == "request_scheduled":
Expand Down Expand Up @@ -374,9 +373,9 @@ def _check_result_ready(
async def _stop_processes(
self,
futures: list[asyncio.Future],
requests_queue: multiprocessing.Queue,
stop_event: Event,
):
for _ in futures:
requests_queue.put(None)
# stop all processes
stop_event.set()

await asyncio.gather(*futures)
Loading
Loading