Skip to content

Commit 98f7a6d

Browse files
authored
Merge branch 'v2' into feature/damian/v2/factor_out_transformation_utils
2 parents 4f248dd + e1b7f37 commit 98f7a6d

12 files changed

+863
-0
lines changed

src/deepsparse/v2/operators/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# flake8: noqa
2+
# isort: skip_file
23

34
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
45
#
@@ -14,3 +15,4 @@
1415
# See the License for the specific language governing permissions and
1516
# limitations under the License.
1617
from .operator import *
18+
from .engine_operator import *

src/deepsparse/v2/operators/engine_operator.py

+32
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,45 @@ class EngineOperatorInputs(BaseModel):
3939
default=None,
4040
)
4141

42+
@classmethod
43+
def join(cls, inputs: List["EngineOperatorInputs"]) -> "EngineOperatorInputs":
44+
"""
45+
:param inputs: list of separate EngineOperatorInputs, batch size must be 1
46+
:return: list of inputs joined into a single input with a multi batch size
47+
"""
48+
all_engine_inputs = [engine_input.engine_inputs for engine_input in inputs]
49+
50+
for engine_inputs in all_engine_inputs:
51+
if engine_inputs[0].shape[0] != 1:
52+
raise RuntimeError(
53+
"join requires all inputs to have batch size 1, found input with "
54+
f"batch size {engine_inputs[0].shape[0]}"
55+
)
56+
57+
# use join_engine_outputs since dtype is the same
58+
joined_engine_inputs = join_engine_outputs(
59+
all_engine_inputs, len(all_engine_inputs)
60+
)
61+
62+
return cls(engine_inputs=joined_engine_inputs)
63+
4264
class Config:
4365
arbitrary_types_allowed = True
4466

4567

4668
class EngineOperatorOutputs(BaseModel):
4769
engine_outputs: List = Field(description="engine outputs")
4870

71+
def split(self) -> List["EngineOperatorOutputs"]:
72+
"""
73+
:return: list of the current outputs split to a batch size of 1 each
74+
"""
75+
# using split_engine_inputs since input/output dtypes
76+
# are the same (List[ndarray])
77+
split_outputs, _ = split_engine_inputs(self.engine_outputs, batch_size=1)
78+
79+
return [self.__class__(engine_outputs=outputs) for outputs in split_outputs]
80+
4981

5082
class EngineOperator(Operator):
5183
input_schema = EngineOperatorInputs

src/deepsparse/v2/schedulers/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# flake8: noqa
2+
# isort: skip_file
23

34
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
45
#
@@ -16,3 +17,4 @@
1617

1718
from .scheduler import *
1819
from .scheduler_group import *
20+
from .continuous_batching_scheduler import *
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing,
10+
# software distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from concurrent.futures import Future
17+
from threading import Lock
18+
from typing import List
19+
20+
from deepsparse.v2.operators import EngineOperator, Operator
21+
from deepsparse.v2.schedulers.scheduler import OperatorScheduler
22+
from deepsparse.v2.schedulers.utils import (
23+
ContinuousBatchingExecutorThread,
24+
ContinuousBatchingQueues,
25+
)
26+
27+
28+
__all__ = ["ContinuousBatchingScheduler"]
29+
30+
31+
_GLOBAL_SCHEDULER = None
32+
33+
34+
class ContinuousBatchingScheduler(OperatorScheduler):
35+
"""
36+
Manages EngineOperator jobs that should be run with continuous batching.
37+
Groups requests for the same engine into larger batches and returns
38+
the result to the respective request threads after scheduled completion
39+
40+
Example code for getting or creating a shared instance for scheduling
41+
between pipelines and adding an engine operator to the scheduler
42+
within a pipeline
43+
44+
```python
45+
46+
class MyPipeline(Pipeline):
47+
48+
def __init__(self):
49+
...
50+
engine_operator = EngineOperator(...)
51+
...
52+
continuous_batching_scheduler = ContinuousBatchingScheduler.get_instance()
53+
continuous_batching_scheduler.add_engine_operator(engine_operator)
54+
55+
super.__init__(...)
56+
```
57+
58+
:param max_workers: maximum number of threads to execute at once, default 1
59+
"""
60+
61+
def __init__(self, max_workers: int = 1):
62+
self._max_workers = max_workers
63+
64+
self._mutex = Lock()
65+
66+
# Dict[EngineOperator, Dict[batch_size, Engine]]
67+
self._operators_to_engines = {} # EngineOperator -> Dict[batch_size, Engine]
68+
self._queues = ContinuousBatchingQueues()
69+
70+
# create and start max number of worker threads
71+
self._threads = [
72+
ContinuousBatchingExecutorThread(self._queues, self._operators_to_engines)
73+
for _ in range(self.max_workers)
74+
]
75+
for worker_thread in self._threads:
76+
worker_thread.start()
77+
78+
@classmethod
79+
def get_instance(cls) -> "ContinuousBatchingScheduler":
80+
"""
81+
:return: global instance of the continuous batching scheduler. If one
82+
does not exist yet, a scheduler with a single worker thread to
83+
schedule all jobs is created and started
84+
"""
85+
if _GLOBAL_SCHEDULER is not None:
86+
return _GLOBAL_SCHEDULER # noqa: F823
87+
88+
_GLOBAL_SCHEDULER = cls(max_workers=1)
89+
return _GLOBAL_SCHEDULER
90+
91+
@property
92+
def max_workers(self) -> int:
93+
"""
94+
:return: maximum number of threads to execute at once
95+
"""
96+
return self._max_workers
97+
98+
def submit(self, *args, operator: Operator, **kwargs) -> Future:
99+
"""
100+
:param operator: operator to run
101+
:param operator_input: input schema to the operator
102+
:return: future referencing the asynchronously run output of the operator
103+
"""
104+
inputs = args[0]
105+
if not isinstance(inputs, operator.input_schema):
106+
raise ValueError(
107+
"Inputs to ContinuousBatchingScheduler must be the specific "
108+
f"input schema to the given operator. Expected {operator.input_schema}"
109+
f"found {type(inputs)}"
110+
)
111+
112+
future = Future()
113+
self._queues.add_queue_item(key=operator, item=inputs, future=future)
114+
115+
return future
116+
117+
def can_process(self, *args, operator: Operator, **kwargs) -> bool:
118+
"""
119+
:param operator: operator to check
120+
:param operator_input: operator_input to check
121+
:return: True if this Operator can process the given operator and input.
122+
SchedulerGroup always returns True
123+
"""
124+
return operator in self._operators_to_engines and operator in self._queues
125+
126+
def add_engine_operator(
127+
self, engine_operator: EngineOperator, batch_sizes: List[int]
128+
):
129+
"""
130+
Adds tracking for an engine operator to this scheduler
131+
with continuous batching for the given sizes
132+
133+
:param engine_operator: an EngineOperator, must be compiled with
134+
batch_size=1
135+
:param batch_sizes: batch sizes to use for continuous batching
136+
"""
137+
# lock updates to _operators_to_engines while updating
138+
self._mutex.acquire()
139+
140+
# validation
141+
if engine_operator in self._operators_to_engines:
142+
# operator already added
143+
return
144+
145+
if not isinstance(engine_operator, EngineOperator):
146+
raise ValueError(
147+
f"Expected an EngineOperator instance, found {type(engine_operator)}"
148+
)
149+
if engine_operator.batch_size != 1:
150+
raise ValueError(
151+
"For continuous batching, EngineOperator must have batch_size=1. "
152+
f"found batch_size={engine_operator.batch_size}"
153+
)
154+
155+
# build EngineOperator -> List[batch_size] dict
156+
operator_engines = {}
157+
# base engine, expected batch size is 1
158+
operator_engines[engine_operator.batch_size] = engine_operator.engine
159+
160+
# compile auxillary engines for continuous batching
161+
for batch_size in batch_sizes:
162+
if batch_size == 1:
163+
continue # already added
164+
operator_engines[batch_size] = operator_engines.create_engine(
165+
batch_size=batch_size
166+
)
167+
168+
self._operators_to_engines[engine_operator] = operator_engines
169+
self._queues.add_queue(
170+
key=engine_operator,
171+
batch_sizes=list(operator_engines.keys()),
172+
)
173+
174+
# release lock
175+
self._mutex.release()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# flake8: noqa
2+
# isort: skip_file
3+
4+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
from .continuous_batching_queues import *
19+
from .continuous_batching_executor import *
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing,
10+
# software distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from threading import Thread
16+
from typing import Dict
17+
18+
from deepsparse import Engine
19+
from deepsparse.v2.operators import EngineOperator
20+
from deepsparse.v2.schedulers.utils.continuous_batching_queues import (
21+
ContinuousBatchingQueues,
22+
)
23+
24+
25+
__all__ = [
26+
"ContinuousBatchingExecutorThread",
27+
]
28+
29+
30+
class ContinuousBatchingExecutorThread(Thread):
31+
"""
32+
Thread that when started runs indefinitely, grabbing a valid batch from
33+
the queues when possible and running them in the correct engine
34+
35+
:param queues: ContinuousBatchingQueues object containing a queue for
36+
each valid engine
37+
:param operators_to_engines: dictionary mapping valid engine operators
38+
to a dictionary of its valid batch sizes mapped to an engine compiled
39+
for that batch size
40+
"""
41+
42+
def __init__(
43+
self,
44+
queues: ContinuousBatchingQueues,
45+
operators_to_engines: Dict[EngineOperator, Dict[int, Engine]],
46+
):
47+
self._queues = queues
48+
self._operators_to_engines = operators_to_engines
49+
self._should_stop = False
50+
51+
super().__init__(target=self._working_loop)
52+
self.daemon = True # worker thread should exit when main thread exits
53+
54+
def _working_loop(self):
55+
# indefinitely wait for batch, run batch, split and resolve futures
56+
while True:
57+
# wait for next batch to be available
58+
engine_operator, batch = self._queues.pop_batch(block=True)
59+
60+
# unpack batch of QueueEntry objects
61+
engine_inputs, futures, _ = list(zip(*batch))
62+
batch_size = len(engine_inputs)
63+
64+
# type is EngineOperatorInputs
65+
joined_inputs = engine_operator.input_schema.join(engine_inputs)
66+
67+
# get engine for this operator compiled to the popped batch size
68+
# and set the inputs to execute with it
69+
joined_inputs.engine = self._operators_to_engines[engine_operator][
70+
batch_size
71+
]
72+
73+
# run the engine operator with the given engine at the joined batch size
74+
joined_outputs = engine_operator(joined_inputs)
75+
76+
# split outputs and return the results to their respective futures
77+
split_outputs = joined_outputs.split()
78+
for output, future in zip(split_outputs, futures):
79+
future.set_result(output)

0 commit comments

Comments
 (0)