Skip to content

Commit d835e73

Browse files
authored
Merge pull request #6 from pyper-dev/dev
Dev
2 parents ee80325 + 504e5de commit d835e73

File tree

8 files changed

+149
-57
lines changed

8 files changed

+149
-57
lines changed

.coveragerc

+1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
exclude_also =
33
# pragma: no cover
44
if TYPE_CHECKING:
5+
if t.TYPE_CHECKING:
56
raise NotImplementedError

README.md

+10-8
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@ Let's simulate a pipeline that performs a series of transformations on some data
5151
```python
5252
import asyncio
5353
import time
54+
import typing
5455

5556
from pyper import task
5657

5758

58-
def step1(limit):
59+
def step1(limit: int):
5960
"""Generate some data."""
6061
for i in range(limit):
6162
yield i
@@ -75,7 +76,7 @@ def step3(data: int):
7576
return 2 * data - 1
7677

7778

78-
async def print_sum(data):
79+
async def print_sum(data: typing.AsyncGenerator[int]):
7980
"""Print the sum of values from a data stream."""
8081
total = 0
8182
async for output in data:
@@ -117,7 +118,7 @@ Having defined the logical operations we want to perform on our data as function
117118
```python
118119
# Analogous to:
119120
# pipeline = task(step1) | task(step2) | task(step3)
120-
async def pipeline(limit):
121+
async def pipeline(limit: int):
121122
for data in step1(limit):
122123
data = await step2(data)
123124
data = step3(data)
@@ -126,7 +127,7 @@ async def pipeline(limit):
126127

127128
# Analogous to:
128129
# run = pipeline > print_sum
129-
async def run(limit):
130+
async def run(limit: int):
130131
await print_sum(pipeline(limit))
131132

132133

@@ -152,7 +153,7 @@ Concurrent programming in Python is notoriously difficult to get right. In a con
152153
The basic approach to doing this is by using queues-- a simplified and very unabstracted implementation could be:
153154

154155
```python
155-
async def pipeline(limit):
156+
async def pipeline(limit: int):
156157
q1 = asyncio.Queue()
157158
q2 = asyncio.Queue()
158159
q3 = asyncio.Queue()
@@ -210,7 +211,7 @@ async def pipeline(limit):
210211
yield data
211212

212213

213-
async def run(limit):
214+
async def run(limit: int):
214215
await print_sum(pipeline(limit))
215216

216217

@@ -233,11 +234,12 @@ No-- not every program is asynchronous, so Pyper pipelines are by default synchr
233234

234235
```python
235236
import time
237+
import typing
236238

237239
from pyper import task
238240

239241

240-
def step1(limit):
242+
def step1(limit: int):
241243
for i in range(limit):
242244
yield i
243245

@@ -252,7 +254,7 @@ def step3(data: int):
252254
return 2 * data - 1
253255

254256

255-
def print_sum(data):
257+
def print_sum(data: typing.Generator[int]):
256258
total = 0
257259
for output in data:
258260
total += output

docs/src/docs/UserGuide/CombiningPipelines.md

+5-10
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ permalink: /docs/UserGuide/CombiningPipelines
1212
1. TOC
1313
{:toc}
1414

15-
1615
## Piping and the `|` Operator
1716

1817
The `|` operator (inspired by UNIX syntax) is used to pipe one pipeline into another. This is syntactic sugar for the `Pipeline.pipe` method.
@@ -46,7 +45,6 @@ new_new_pipeline = p0 | new_pipeline | p4
4645
new_new_new_pipeline = new_pipeline | new_new_pipeline
4746
```
4847

49-
5048
## Consumer Functions and the `>` Operator
5149

5250
It is often useful to define resuable functions that process the results of a pipeline, which we'll call a 'consumer'. For example:
@@ -89,11 +87,8 @@ run = step1.pipe(step2).consume(JsonFileWriter("data.json"))
8987
run(limit=10)
9088
```
9189

92-
93-
The operator `>` is obviously not to be taken to mean 'greater than' when used in these contexts.
94-
9590
{: .info}
96-
Pyper comes with fantastic IDE intellisense support which understands these operators, and will always show you what the resulting type of a variable is (including the input and output type specs for pipelines)
91+
Pyper comes with fantastic IDE intellisense support which understands these operators, and will always show you which variables are `Pipeline` or `AsyncPipeline` objects; this also preserves type hints from your own functions, showing you the parameter and return type specs for each pipeline or consumer
9792

9893
## Asynchronous Code
9994

@@ -111,10 +106,10 @@ assert isinstance(task(func), AsyncPipeline)
111106

112107
When combining pipelines, the following rule applies:
113108

114-
* `Pipeline` > `Pipeline` = `Pipeline`
115-
* `Pipeline` > `AsyncPipeline` = `AsyncPipeline`
116-
* `AsyncPipeline` > `Pipeline` = `AsyncPipeline`
117-
* `AsyncPipeline` > `AsyncPipeline` = `AsyncPipeline`
109+
* `Pipeline` + `Pipeline` = `Pipeline`
110+
* `Pipeline` + `AsyncPipeline` = `AsyncPipeline`
111+
* `AsyncPipeline` + `Pipeline` = `AsyncPipeline`
112+
* `AsyncPipeline` + `AsyncPipeline` = `AsyncPipeline`
118113

119114
In other words:
120115

docs/src/docs/UserGuide/Considerations.md

+6-5
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,13 @@ The advantage of using `daemon` threads is that they do not prevent the main pro
7474
Therefore, there is a simple consideration that determines whether to set `daemon=True` on a particular task:
7575

7676
{: .info}
77-
Tasks can be created with `daemon=True` when they do NOT reach out to external resources.
77+
Tasks can be created with `daemon=True` when they do NOT reach out to external resources
7878

79-
This includes:
80-
* Pure functions, which simply take an input and generate an output
81-
* Functions that depend on or modify some external Python state, like an `Object` or a `Class`
79+
This includes all **pure functions** (functions which simply take an input and generate an output, without mutating external state).
8280

8381
Functions that should _not_ use `daemon` threads include:
8482
* Writing to a database
85-
* Reading from a file
83+
* Processing a file
84+
* Making a network request
85+
86+
Recall that only synchronous tasks can be created with `daemon=True`.

docs/src/docs/UserGuide/CreatingPipelines.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ In addition to functions, anything `callable` in Python can be wrapped in `task`
4545
from pyper import task
4646

4747
class Doubler:
48-
def __call__(self, x):
48+
def __call__(self, x: int):
4949
return 2 * x
5050

5151
pipeline1 = task(Doubler())

docs/src/docs/UserGuide/TaskParameters.md

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ permalink: /docs/UserGuide/TaskParameters
1212
1. TOC
1313
{:toc}
1414

15-
1615
> For convenience, we will use the following terminology on this page:
1716
> * **Producer**: The _first_ task within a pipeline
1817
> * **Producer-consumer**: Any task after the first task within a pipeline

src/pyper/_core/decorators.py

+78-14
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,111 @@
11
from __future__ import annotations
22

33
import functools
4-
from typing import Callable, Dict, Optional, overload, Tuple
4+
import typing as t
55

6-
from .pipeline import Pipeline
6+
from .pipeline import AsyncPipeline, Pipeline
77
from .task import Task
88

99

10+
_P = t.ParamSpec('P')
11+
_R = t.TypeVar('R')
12+
_ArgsKwargs: t.TypeAlias = t.Optional[t.Tuple[t.Tuple[t.Any], t.Dict[str, t.Any]]]
13+
14+
1015
class task:
1116
"""Decorator class to transform a function into a `Task` object, and then initialize a `Pipeline` with this task.
1217
A Pipeline initialized in this way consists of one Task, and can be piped into other Pipelines.
1318
1419
The behaviour of each task within a Pipeline is determined by the parameters:
15-
`join`: allows the function to take all previous results as input, instead of single results
16-
`concurrency`: runs the functions with multiple (async or threaded) workers
17-
`throttle`: limits the number of results the function is able to produce when all consumers are busy
20+
* `join`: allows the function to take all previous results as input, instead of single results
21+
* `concurrency`: runs the functions with multiple (async or threaded) workers
22+
* `throttle`: limits the number of results the function is able to produce when all consumers are busy
23+
* `daemon`: determines whether threaded workers are daemon threads (cannot be True for async tasks)
24+
* `bind`: additional args and kwargs to bind to the function when defining a pipeline
1825
"""
19-
@overload
20-
def __new__(cls, func: None = None, /, *, join: bool = False, concurrency: int = 1, throttle: int = 0, daemon: bool = False, bind: Optional[Tuple[Tuple, Dict]] = None) -> Callable[..., Pipeline]:
21-
"""Enable type hints for functions decorated with `@task()`."""
26+
@t.overload
27+
def __new__(
28+
cls,
29+
func: None = None,
30+
/,
31+
*,
32+
join: bool = False,
33+
concurrency: int = 1,
34+
throttle: int = 0,
35+
daemon: bool = False,
36+
bind: _ArgsKwargs = None
37+
) -> t.Type[task]: ...
2238

23-
@overload
24-
def __new__(cls, func: Callable, /, *, join: bool = False, concurrency: int = 1, throttle: int = 0, daemon: bool = False, bind: Optional[Tuple[Tuple, Dict]] = None) -> Pipeline:
25-
"""Enable type hints for functions decorated with `@task`."""
39+
@t.overload
40+
def __new__(
41+
cls,
42+
func: t.Callable[_P, t.Awaitable[_R]],
43+
/,
44+
*,
45+
join: bool = False,
46+
concurrency: int = 1,
47+
throttle: int = 0,
48+
daemon: bool = False,
49+
bind: _ArgsKwargs = None
50+
) -> AsyncPipeline[_P, _R]: ...
2651

52+
@t.overload
53+
def __new__(
54+
cls,
55+
func: t.Callable[_P, t.AsyncGenerator[_R]],
56+
/,
57+
*,
58+
join: bool = False,
59+
concurrency: int = 1,
60+
throttle: int = 0,
61+
daemon: bool = False,
62+
bind: _ArgsKwargs = None
63+
) -> AsyncPipeline[_P, _R]: ...
64+
65+
@t.overload
66+
def __new__(
67+
cls,
68+
func: t.Callable[_P, t.Generator[_R]],
69+
/,
70+
*,
71+
join: bool = False,
72+
concurrency: int = 1,
73+
throttle: int = 0,
74+
daemon: bool = False,
75+
bind: _ArgsKwargs = None
76+
) -> Pipeline[_P, _R]: ...
77+
78+
@t.overload
79+
def __new__(
80+
cls,
81+
func: t.Callable[_P, _R],
82+
/,
83+
*,
84+
join: bool = False,
85+
concurrency: int = 1,
86+
throttle: int = 0,
87+
daemon: bool = False,
88+
bind: _ArgsKwargs = None
89+
) -> Pipeline[_P, _R]: ...
90+
2791
def __new__(
2892
cls,
29-
func: Optional[Callable] = None,
93+
func: t.Optional[t.Callable] = None,
3094
/,
3195
*,
3296
join: bool = False,
3397
concurrency: int = 1,
3498
throttle: int = 0,
3599
daemon: bool = False,
36-
bind: Optional[Tuple[Tuple, Dict]] = None
100+
bind: _ArgsKwargs = None
37101
):
38102
# Classic decorator trick: @task() means func is None, @task without parentheses means func is passed.
39103
if func is None:
40104
return functools.partial(cls, join=join, concurrency=concurrency, throttle=throttle, daemon=daemon, bind=bind)
41105
return Pipeline([Task(func=func, join=join, concurrency=concurrency, throttle=throttle, daemon=daemon, bind=bind)])
42106

43107
@staticmethod
44-
def bind(*args, **kwargs) -> Optional[Tuple[Tuple, Dict]]:
108+
def bind(*args, **kwargs) -> _ArgsKwargs:
45109
"""Utility method, to be used with `functools.partial`."""
46110
if not args and not kwargs:
47111
return None

0 commit comments

Comments
 (0)