-
-
Notifications
You must be signed in to change notification settings - Fork 354
Path.iterdir/glob/rglob methods do not delegate file system calls to a thread #1308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
To clarify, you mean that these methods return a generator object, and potentially some blocking I/O might be done in the main thread during The amount of I/O done in the method call vs. subsequent yields could depend on the pathlib implementation, OS, and size of the data and directories involved. Expanding all the results immediately into a list doesn't sound good. Use a memory channel to iterate results from the background thread? |
Exactly, this is definitely the case for
At the moment every IO call seems to happen during the iteration but I agree that both the method call and the iteration should run in a thread as this implementation might change in the future.
Apparently this has been discussed in #501 and implemented using list expension in #502. Then PR #969 came along and got rid of the list expansion, probably by mistake (if I read this comment in #917 correctly). |
thank you for investigating I see that
agreed, just fix everything to do that |
Now that I think about it, there are two separate issues here:
Ideally, we'd like to be able to do: async for item in path.iterdir():
[...]
async for item in path.glob("..."):
[...]
async for item in path.rglob("..."):
[...] This can be easily achieved with the naive implementation: @async_wraps(cls, cls._wraps, meth_name)
async def wrapper(self, *args, **kwargs):
meth = getattr(self._wrapped, meth_name)
func = partial(meth, *args, **kwargs)
items_iter = await trio.to_thread.run_sync(func)
stop_iteration = object()
def safe_next():
try:
return items_iter.__next__()
except StopIteration:
return stop_iteration
while True:
item = await trio.to_thread.run_sync(safe_next)
if item is stop_iteration:
return
yield rewrap_path(item) However, it would be quite slow as pointed by @njsmith in this comment. You did mention a more efficient approach:
Here's a possible implementation: @async_wraps(cls, cls._wraps, meth_name)
async def wrapper(self, *args, **kwargs):
meth = getattr(self._wrapped, meth_name)
send_channel, receive_channel = trio.open_memory_channel(1)
def run_in_thread():
try:
trio.from_thread.run(send_channel.__aenter__)
for item in meth(*args, **kwargs):
trio.from_thread.run(send_channel.send, item)
finally:
trio.from_thread.run(send_channel.__aexit__)
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.to_thread.run_sync, run_in_thread)
async for item in receive_channel:
yield rewrap_path(item) ... except this falls into the nursery-inside-generator trap. In any case I think it's an interesting problem for trio in general, Is there another approach that I missed? |
I ended up benchmarking several ways of wrapping an iterator into an async iterator:
This made me realize that performing a context switch for each item is also a bottleneck (notice the result of Maybe an acceptable compromise is to use the following implementation: import trio
import time
BATCH_TIME_LIMIT = 0.001
BATCH_LENGTH_LIMIT = 1000
async def to_aiter(fn, *args, **kwargs):
def instantiate():
return iter(fn(*args, **kwargs))
items_iter = await trio.to_thread.run_sync(instantiate)
def run_batch():
batch = []
deadline = time.time() + BATCH_TIME_LIMIT
while time.time() < deadline and len(batch) < BATCH_LENGTH_LIMIT:
try:
batch.append((next(items_iter), None))
except Exception as exc:
batch.append((None, exc))
break
return batch
while True:
batch = await trio.to_thread.run_sync(run_batch)
for result, exception in batch:
if isinstance(exception, StopIteration):
return
if exception is not None:
raise exception
yield result I think it would work fine for |
I'd suggest not using time: 1 ms has different meaning for mechanical vs. solid state storage, and for a computer today vs. 20 years from now. Also very difficult to deal with wall time in automated tests (free CI tends to run on VM and can stall arbitrarily). As a compromise for not watching time, I'd use a batch size of 100. Being 100x faster than the naive implementation seems good enough. There might be some opportunity to use the outcome API? I haven't used it myself but it has some ability to capture function calls and send them to a generator. https://outcome.readthedocs.io/en/latest/api.html#api-reference |
That makes sense. The reason I used both the time limit and the length limit is because I wanted it to be able to handle different kinds iterators such as def slow_range(*args):
for x in range(*args):
time.sleep(1)
yield x In this example the time-limited implementation would produce a batch of size 1 every second, instead of waiting 100 seconds before producing the first value. In the case of That being said, I agree that 1 ms is arbitrary. Ideally we'd want this threshold to be a few times the thread synchronization cost. On my machine the full round trip is about 200 us so a threshold of 1ms made sense. In general it could be computed like this: start = time.time()
await trio.to_thread.run_sync(lambda: None)
threshold = 5 * (time.time() - start)
I agree, that should be enough for the case of
I thought about it but the captured result can only be used once. The problem is that if I'll wait for your feedback and make a PR if that's ok with you :) |
Notice how the actual iteration is not performed in the executor thread:
trio/trio/_path.py
Lines 56 to 62 in 20da9af
Here's a possible fix:
The text was updated successfully, but these errors were encountered: