Skip to content

Commit

Permalink
Merge pull request scrapy#4259 from scrapy/asyncio-mw
Browse files Browse the repository at this point in the history
Asyncio support in downloader middlewares
  • Loading branch information
kmike authored Jan 3, 2020
2 parents 14d4428 + b2dd379 commit ce618fb
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ matrix:
- env: TOXENV=pinned
python: 3.5
- env: TOXENV=py35-asyncio
python: 3.5
python: 3.5.2
- env: TOXENV=py36
python: 3.6
- env: TOXENV=py37
Expand Down
8 changes: 4 additions & 4 deletions scrapy/core/downloader/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from scrapy.exceptions import _InvalidOutput
from scrapy.http import Request, Response
from scrapy.middleware import MiddlewareManager
from scrapy.utils.defer import mustbe_deferred
from scrapy.utils.defer import mustbe_deferred, deferred_from_coro
from scrapy.utils.conf import build_component_list


Expand All @@ -33,7 +33,7 @@ def download(self, download_func, request, spider):
@defer.inlineCallbacks
def process_request(request):
for method in self.methods['process_request']:
response = yield method(request=request, spider=spider)
response = yield deferred_from_coro(method(request=request, spider=spider))
if response is not None and not isinstance(response, (Response, Request)):
raise _InvalidOutput('Middleware %s.process_request must return None, Response or Request, got %s' % \
(method.__self__.__class__.__name__, response.__class__.__name__))
Expand All @@ -48,7 +48,7 @@ def process_response(response):
defer.returnValue(response)

for method in self.methods['process_response']:
response = yield method(request=request, response=response, spider=spider)
response = yield deferred_from_coro(method(request=request, response=response, spider=spider))
if not isinstance(response, (Response, Request)):
raise _InvalidOutput('Middleware %s.process_response must return Response or Request, got %s' % \
(method.__self__.__class__.__name__, type(response)))
Expand All @@ -60,7 +60,7 @@ def process_response(response):
def process_exception(_failure):
exception = _failure.value
for method in self.methods['process_exception']:
response = yield method(request=request, exception=exception, spider=spider)
response = yield deferred_from_coro(method(request=request, exception=exception, spider=spider))
if response is not None and not isinstance(response, (Response, Request)):
raise _InvalidOutput('Middleware %s.process_exception must return None, Response or Request, got %s' % \
(method.__self__.__class__.__name__, type(response)))
Expand Down
26 changes: 26 additions & 0 deletions scrapy/utils/defer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""
Helper functions for dealing with Twisted deferreds
"""
import asyncio
import inspect

from twisted.internet import defer, task
from twisted.python import failure

from scrapy.exceptions import IgnoreRequest
from scrapy.utils.asyncio import is_asyncio_reactor_installed


def defer_fail(_failure):
Expand Down Expand Up @@ -114,3 +118,25 @@ def iter_errback(iterable, errback, *a, **kw):
break
except Exception:
errback(failure.Failure(), *a, **kw)


def _isfuture(o):
# workaround for Python before 3.5.3 not having asyncio.isfuture
if hasattr(asyncio, 'isfuture'):
return asyncio.isfuture(o)
return isinstance(o, asyncio.Future)


def deferred_from_coro(o):
"""Converts a coroutine into a Deferred, or returns the object as is if it isn't a coroutine"""
if isinstance(o, defer.Deferred):
return o
if _isfuture(o) or inspect.isawaitable(o):
if not is_asyncio_reactor_installed():
# wrapping the coroutine directly into a Deferred, this doesn't work correctly with coroutines
# that use asyncio, e.g. "await asyncio.sleep(1)"
return defer.ensureDeferred(o)
else:
# wrapping the coroutine into a Future and then into a Deferred, this requires AsyncioSelectorReactor
return defer.Deferred.fromFuture(asyncio.ensure_future(o))
return o
9 changes: 8 additions & 1 deletion scrapy/utils/test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
This module contains some assorted functions used in tests
"""

import asyncio
import os

from importlib import import_module
Expand Down Expand Up @@ -96,3 +96,10 @@ def assert_samelines(testcase, text1, text2, msg=None):
line endings between platforms
"""
testcase.assertEqual(text1.splitlines(), text2.splitlines(), msg)


def get_from_asyncio_queue(value):
q = asyncio.Queue()
getter = q.get()
q.put_nowait(value)
return getter
49 changes: 48 additions & 1 deletion tests/test_downloadermiddleware.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import asyncio
from unittest import mock

from pytest import mark
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure
Expand All @@ -8,7 +11,7 @@
from scrapy.spiders import Spider
from scrapy.exceptions import _InvalidOutput
from scrapy.core.downloader.middleware import DownloaderMiddlewareManager
from scrapy.utils.test import get_crawler
from scrapy.utils.test import get_crawler, get_from_asyncio_queue
from scrapy.utils.python import to_bytes


Expand Down Expand Up @@ -206,3 +209,47 @@ def process_request(self, request, spider):

self.assertIs(results[0], resp)
self.assertFalse(download_func.called)


class MiddlewareUsingCoro(ManagerTestCase):
"""Middlewares using asyncio coroutines should work"""

def test_asyncdef(self):
resp = Response('http://example.com/index.html')

class CoroMiddleware:
async def process_request(self, request, spider):
await defer.succeed(42)
return resp

self.mwman._add_middleware(CoroMiddleware())
req = Request('http://example.com/index.html')
download_func = mock.MagicMock()
dfd = self.mwman.download(download_func, req, self.spider)
results = []
dfd.addBoth(results.append)
self._wait(dfd)

self.assertIs(results[0], resp)
self.assertFalse(download_func.called)

@mark.only_asyncio()
def test_asyncdef_asyncio(self):
resp = Response('http://example.com/index.html')

class CoroMiddleware:
async def process_request(self, request, spider):
await asyncio.sleep(0.1)
result = await get_from_asyncio_queue(resp)
return result

self.mwman._add_middleware(CoroMiddleware())
req = Request('http://example.com/index.html')
download_func = mock.MagicMock()
dfd = self.mwman.download(download_func, req, self.spider)
results = []
dfd.addBoth(results.append)
self._wait(dfd)

self.assertIs(results[0], resp)
self.assertFalse(download_func.called)

0 comments on commit ce618fb

Please sign in to comment.