Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Session binding capability via session_id in Request #1086

Merged
merged 17 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import asyncio
from datetime import timedelta
from itertools import count
from typing import Callable

from crawlee import ConcurrencySettings, Request
from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext
from crawlee.errors import RequestCollisionError
from crawlee.sessions import Session, SessionPool


# Define a function for creating sessions with simple logic for unique `id` generation.
# This is necessary if you need to specify a particular session for the first request,
# for example during authentication
def create_session_function() -> Callable[[], Session]:
counter = count()

def create_session() -> Session:
return Session(
id=str(next(counter)),
max_usage_count=999_999,
max_age=timedelta(hours=999_999),
max_error_score=100,
blocked_status_codes=[403],
)

return create_session


async def main() -> None:
crawler = HttpCrawler(
# Adjust request limits according to your pool size
concurrency_settings=ConcurrencySettings(max_tasks_per_minute=500),
# Requests are bound to specific sessions, no rotation needed
max_session_rotations=0,
session_pool=SessionPool(
max_pool_size=10, create_session_function=create_session_function()
),
)

@crawler.router.default_handler
async def basic_handler(context: HttpCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url}')

# Initialize the session and bind the next request to this session if needed
@crawler.router.handler(label='session_init')
async def session_init(context: HttpCrawlingContext) -> None:
next_requests = []
if context.session:
context.log.info(f'Init session {context.session.id}')
next_request = Request.from_url(
'https://placeholder.dev', session_id=context.session.id
)
next_requests.append(next_request)

await context.add_requests(next_requests)

# Handle errors when a session is blocked and no longer available in the pool
# when attempting to execute requests bound to it
@crawler.failed_request_handler
async def error_processing(context: BasicCrawlingContext, error: Exception) -> None:
if isinstance(error, RequestCollisionError) and context.session:
context.log.error(
f'Request {context.request.url} failed, because the bound '
'session is unavailable'
)

# Create a pool of requests bound to their respective sessions
# Use `always_enqueue=True` if session initialization happens on a non-unique address,
# such as the site's main page
init_requests = [
Request.from_url(
'https://example.org/',
label='session_init',
session_id=str(session_id),
use_extended_unique_key=True,
)
for session_id in range(1, 11)
]

await crawler.run(init_requests)


if __name__ == '__main__':
asyncio.run(main())
56 changes: 56 additions & 0 deletions docs/guides/code_examples/session_management/one_session_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
from datetime import timedelta

from crawlee import ConcurrencySettings, Request
from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext
from crawlee.errors import SessionError
from crawlee.sessions import SessionPool


async def main() -> None:
crawler = HttpCrawler(
# Limit requests per minute to reduce the chance of being blocked
concurrency_settings=ConcurrencySettings(max_tasks_per_minute=50),
# Disable session rotation
max_session_rotations=0,
session_pool=SessionPool(
# Only one session in the pool
max_pool_size=1,
create_session_settings={
# High value for session usage limit
'max_usage_count': 999_999,
# High value for session lifetime
'max_age': timedelta(hours=999_999),
# High score allows the session to encounter more errors
# before crawlee decides the session is blocked
# Make sure you know how to handle these errors
'max_error_score': 100,
# 403 status usually indicates you're already blocked
'blocked_status_codes': [403],
},
),
)

# Basic request handling logic
@crawler.router.default_handler
async def basic_handler(context: HttpCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url}')

# Handler for session initialization (authentication, initial cookies, etc.)
@crawler.router.handler(label='session_init')
async def session_init(context: HttpCrawlingContext) -> None:
if context.session:
context.log.info(f'Init session {context.session.id}')

# Monitor if our session gets blocked and explicitly stop the crawler
@crawler.error_handler
async def error_processing(context: BasicCrawlingContext, error: Exception) -> None:
if isinstance(error, SessionError) and context.session:
context.log.info(f'Session {context.session.id} blocked')
crawler.stop()

await crawler.run([Request.from_url('https://example.org/', label='session_init')])


if __name__ == '__main__':
asyncio.run(main())
24 changes: 24 additions & 0 deletions docs/guides/session_management.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import BeautifulSoupSource from '!!raw-loader!./code_examples/session_management
import ParselSource from '!!raw-loader!./code_examples/session_management/session_management_parsel.py';
import PlaywrightSource from '!!raw-loader!./code_examples/session_management/session_management_playwright.py';
import StandaloneSource from '!!raw-loader!./code_examples/session_management/session_management_standalone.py';
import OneSession from '!!raw-loader!./code_examples/session_management/one_session_http.py';
import MultiSessions from '!!raw-loader!./code_examples/session_management/multi_sessions_http.py';

The <ApiLink to="class/SessionPool">`SessionPool`</ApiLink> class provides a robust way to manage the rotation of proxy IP addresses, cookies, and other custom settings in Crawlee. Its primary advantage is the ability to filter out blocked or non-functional proxies, ensuring that your scraper avoids retrying requests through known problematic proxies.

Expand Down Expand Up @@ -68,3 +70,25 @@ Now, let's explore examples of how to use the <ApiLink to="class/SessionPool">`S
These examples demonstrate the basics of configuring and using the <ApiLink to="class/SessionPool">`SessionPool`</ApiLink>.

Please, bear in mind that <ApiLink to="class/SessionPool">`SessionPool`</ApiLink> requires some time to establish a stable pool of working IPs. During the initial setup, you may encounter errors as the pool identifies and filters out blocked or non-functional IPs. This stabilization period is expected and will improve over time.

## Configuring a single session

In some cases, you need full control over session usage. For example, when working with websites requiring authentication or initialization of certain parameters like cookies.

When working with a site that requires authentication, we typically don't want multiple sessions with different browser fingerprints or client parameters accessing the site. In this case, we need to configure the <ApiLink to="class/SessionPool">`SessionPool`</ApiLink> appropriately:

<CodeBlock language="py">
{OneSession}
</CodeBlock>

## Binding requests to specific sessions

In the previous example, there's one obvious limitation - you're restricted to only one session.

In some cases, we need to achieve the same behavior but using multiple sessions in parallel, such as authenticating with different profiles or using different proxies.

To do this, use the `session_id` parameter for the <ApiLink to="class/Request">`Request`</ApiLink> object to bind a request to a specific session:

<CodeBlock language="py">
{MultiSessions}
</CodeBlock>
22 changes: 20 additions & 2 deletions src/crawlee/_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class CrawleeRequestData(BaseModel):
crawl_depth: Annotated[int, Field(alias='crawlDepth')] = 0
"""The depth of the request in the crawl tree."""

session_id: Annotated[str | None, Field()] = None
"""ID of a session to which the request is bound."""


class UserData(BaseModel, MutableMapping[str, JsonSerializable]):
"""Represents the `user_data` part of a Request.
Expand All @@ -84,6 +87,7 @@ def __setitem__(self, key: str, value: JsonSerializable) -> None:
raise ValueError('`label` must be str or None')

self.label = value

self.__pydantic_extra__[key] = value

def __delitem__(self, key: str) -> None:
Expand Down Expand Up @@ -119,6 +123,7 @@ class RequestOptions(TypedDict):
headers: NotRequired[HttpHeaders | dict[str, str] | None]
payload: NotRequired[HttpPayload | str | None]
label: NotRequired[str | None]
session_id: NotRequired[str | None]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't session_id be used for unique_key computation? I expect that users might get hindered by deduplication if they try to re-enqueue a failed request with a different session.

CC @vdusek - you wrote a big part of the unique key functionality.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, deduplication will affect this.

But I expect that users will use existing mechanisms to return a Request to the Queue avoiding deduplication. By passing either unique_key or always_enqueue=True.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't session_id be used for unique_key computation? I expect that users might get hindered by deduplication if they try to re-enqueue a failed request with a different session.

Good point! Currently, it infers the unique_key from the URL, method, headers, and payload (in its extended form). You can, of course, use session_id together with always_enqueue and it will work, but that feels like a workaround to me. I believe we should include the session_id in the extended unique_key computation.

unique_key: NotRequired[str | None]
id: NotRequired[str | None]
keep_url_fragment: NotRequired[bool]
Expand Down Expand Up @@ -227,6 +232,7 @@ def from_url(
headers: HttpHeaders | dict[str, str] | None = None,
payload: HttpPayload | str | None = None,
label: str | None = None,
session_id: str | None = None,
unique_key: str | None = None,
id: str | None = None,
keep_url_fragment: bool = False,
Expand All @@ -248,14 +254,17 @@ def from_url(
payload: The data to be sent as the request body. Typically used with 'POST' or 'PUT' requests.
label: A custom label to differentiate between request types. This is stored in `user_data`, and it is
used for request routing (different requests go to different handlers).
session_id: ID of a specific `Session` to which the request will be strictly bound.
If the session becomes unavailable when the request is processed, a `RequestCollisionError` will be
raised.
unique_key: A unique key identifying the request. If not provided, it is automatically computed based on
the URL and other parameters. Requests with the same `unique_key` are treated as identical.
id: A unique identifier for the request. If not provided, it is automatically generated from the
`unique_key`.
keep_url_fragment: Determines whether the URL fragment (e.g., `#section`) should be included in
the `unique_key` computation. This is only relevant when `unique_key` is not provided.
use_extended_unique_key: Determines whether to include the HTTP method and payload in the `unique_key`
computation. This is only relevant when `unique_key` is not provided.
use_extended_unique_key: Determines whether to include the HTTP method, ID Session and payload in the
`unique_key` computation. This is only relevant when `unique_key` is not provided.
always_enqueue: If set to `True`, the request will be enqueued even if it is already present in the queue.
Using this is not allowed when a custom `unique_key` is also provided and will result in a `ValueError`.
**kwargs: Additional request properties.
Expand All @@ -274,6 +283,7 @@ def from_url(
method=method,
headers=headers,
payload=payload,
session_id=session_id,
keep_url_fragment=keep_url_fragment,
use_extended_unique_key=use_extended_unique_key,
)
Expand All @@ -296,6 +306,9 @@ def from_url(
if label is not None:
request.user_data['label'] = label

if session_id is not None:
request.crawlee_data.session_id = session_id

return request

def get_query_param_from_url(self, param: str, *, default: str | None = None) -> str | None:
Expand All @@ -308,6 +321,11 @@ def label(self) -> str | None:
"""A string used to differentiate between arbitrary request types."""
return cast('UserData', self.user_data).label

@property
def session_id(self) -> str | None:
"""The ID of the bound session, if there is any."""
return self.crawlee_data.session_id

@property
def crawlee_data(self) -> CrawleeRequestData:
"""Crawlee-specific configuration stored in the `user_data`."""
Expand Down
8 changes: 7 additions & 1 deletion src/crawlee/_utils/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def compute_unique_key(
method: HttpMethod = 'GET',
headers: HttpHeaders | None = None,
payload: HttpPayload | None = None,
session_id: str | None = None,
*,
keep_url_fragment: bool = False,
use_extended_unique_key: bool = False,
Expand All @@ -96,6 +97,7 @@ def compute_unique_key(
payload: The data to be sent as the request body.
keep_url_fragment: A flag indicating whether to keep the URL fragment.
use_extended_unique_key: A flag indicating whether to include a hashed payload in the key.
session_id: The ID of a specific `Session` to which the request will be strictly bound

Returns:
A string representing the unique key for the request.
Expand All @@ -114,9 +116,13 @@ def compute_unique_key(
if use_extended_unique_key:
payload_hash = _get_payload_hash(payload)
headers_hash = _get_headers_hash(headers)
normalized_session = '' if session_id is None else session_id.lower()

# Return the extended unique key. Use pipe as a separator of the different parts of the unique key.
return f'{normalized_method}|{headers_hash}|{payload_hash}|{normalized_url}'
extended_part = f'{normalized_method}|{headers_hash}|{payload_hash}'
if normalized_session:
extended_part = f'{extended_part}|{normalized_session}'
return f'{extended_part}|{normalized_url}'

# Log information if there is a non-GET request with a payload.
if normalized_method != 'GET' and payload:
Expand Down
41 changes: 40 additions & 1 deletion src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
ContextPipelineInterruptedError,
HttpClientStatusCodeError,
HttpStatusCodeError,
RequestCollisionError,
RequestHandlerError,
SessionError,
UserDefinedErrorHandlerError,
Expand Down Expand Up @@ -449,6 +450,20 @@ async def _get_session(self) -> Session | None:
logger=self._logger,
)

async def _get_session_by_id(self, session_id: str | None) -> Session | None:
"""If session pool is being used, try to take a session by id from it."""
if not self._use_session_pool or not session_id:
return None

return await wait_for(
partial(self._session_pool.get_session_by_id, session_id),
timeout=self._internal_timeout,
timeout_message='Fetching a session from the pool timed out after '
f'{self._internal_timeout.total_seconds()} seconds',
max_retries=3,
logger=self._logger,
)

async def _get_proxy_info(self, request: Request, session: Session | None) -> ProxyInfo | None:
"""Retrieve a new ProxyInfo object based on crawler configuration and the current request and session."""
if not self._proxy_configuration:
Expand Down Expand Up @@ -1065,7 +1080,10 @@ async def __run_task_function(self) -> None:
if request is None:
return

session = await self._get_session()
if request.session_id:
session = await self._get_session_by_id(request.session_id)
else:
session = await self._get_session()
proxy_info = await self._get_proxy_info(request, session)
result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store)

Expand All @@ -1088,6 +1106,8 @@ async def __run_task_function(self) -> None:
try:
request.state = RequestState.REQUEST_HANDLER

self._check_request_collision(context.request, context.session)

try:
await self._run_request_handler(context=context)
except asyncio.TimeoutError as e:
Expand All @@ -1110,6 +1130,10 @@ async def __run_task_function(self) -> None:

self._statistics.record_request_processing_finish(statistics_id)

except RequestCollisionError as request_error:
context.request.no_retry = True
await self._handle_request_error(context, request_error)

except RequestHandlerError as primary_error:
primary_error = cast(
'RequestHandlerError[TCrawlingContext]', primary_error
Expand Down Expand Up @@ -1226,3 +1250,18 @@ def _raise_for_session_blocked_status_code(self, session: Session | None, status
ignore_http_error_status_codes=self._ignore_http_error_status_codes,
):
raise SessionError(f'Assuming the session is blocked based on HTTP status code {status_code}')

def _check_request_collision(self, request: Request, session: Session | None) -> None:
"""Raise an exception if a request cannot access required resources.

Args:
request: The `Request` that might require specific resources (like a session).
session: The `Session` that was retrieved for the request, or `None` if not available.

Raises:
RequestCollisionError: If the `Session` referenced by the `Request` is not available.
"""
if self._use_session_pool and request.session_id and not session:
raise RequestCollisionError(
f'The Session (id: {request.session_id}) bound to the Request is no longer available in SessionPool'
)
6 changes: 6 additions & 0 deletions src/crawlee/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
'HttpClientStatusCodeError',
'HttpStatusCodeError',
'ProxyError',
'RequestCollisionError',
'RequestHandlerError',
'ServiceConflictError',
'SessionError',
Expand Down Expand Up @@ -106,3 +107,8 @@ def __init__(self, wrapped_exception: Exception, crawling_context: BasicCrawling
@docs_group('Errors')
class ContextPipelineInterruptedError(Exception):
"""May be thrown in the initialization phase of a middleware to signal that the request should not be processed."""


@docs_group('Errors')
class RequestCollisionError(Exception):
"""Raised when a request cannot be processed due to a conflict with required resources."""
Loading
Loading