Skip to content

Commit cfc7ae2

Browse files
committed
fix: plug thread + session leaks in close() and scrape_batch()
close() now shuts down async_executor and guards against double-close. scrape_batch() wraps the streaming request in try/finally so the per-batch requests.Session is closed even when the generator raises mid-stream or is abandoned before exhaustion.
1 parent ce680e4 commit cfc7ae2

1 file changed

Lines changed: 94 additions & 80 deletions

File tree

scrapfly/client.py

Lines changed: 94 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -604,8 +604,16 @@ def open(self):
604604
self.http_session.headers['user-agent'] = self.ua
605605

606606
def close(self):
607-
self.http_session.close()
608-
self.http_session = None
607+
if self.http_session is not None:
608+
self.http_session.close()
609+
self.http_session = None
610+
# The executor is created in __init__ and owns worker threads that
611+
# outlive the HTTP session; shutting it down here prevents thread
612+
# leaks for callers that reuse the client across open()/close()
613+
# cycles or rely on GC to reclaim it.
614+
if self.async_executor is not None:
615+
self.async_executor.shutdown(wait=False)
616+
self.async_executor = None
609617

610618
def __enter__(self) -> 'ScrapflyClient':
611619
self.open()
@@ -1008,99 +1016,105 @@ def scrape_batch(
10081016
"stream": True,
10091017
}
10101018

1019+
# Own the session for the life of the streaming batch so its
1020+
# connection pool closes whether the generator is fully consumed,
1021+
# errors mid-stream, or is abandoned (finally runs on GC/close()).
10111022
batch_session = requests.Session()
10121023
batch_session.verify = self.verify
10131024

1014-
response = batch_session.request(
1015-
method=request["method"],
1016-
url=request["url"],
1017-
params=request["params"],
1018-
data=request["data"],
1019-
headers=request["headers"],
1020-
timeout=request["timeout"],
1021-
stream=request["stream"],
1022-
)
1025+
try:
1026+
response = batch_session.request(
1027+
method=request["method"],
1028+
url=request["url"],
1029+
params=request["params"],
1030+
data=request["data"],
1031+
headers=request["headers"],
1032+
timeout=request["timeout"],
1033+
stream=request["stream"],
1034+
)
10231035

1024-
if response.status_code != 200:
1025-
# Batch-level error (plan gate, validation, insufficient
1026-
# concurrency, etc.). Response is a single JSON body, not
1027-
# multipart.
1028-
try:
1029-
body = response.json()
1030-
except Exception:
1031-
body = {"message": response.text, "code": "ERR::API::INTERNAL_ERROR"}
1032-
err_code = body.get("code", "ERR::API::INTERNAL_ERROR")
1033-
err_msg = body.get("message", "") or body.get("reason", "")
1034-
retry_after = None
1036+
if response.status_code != 200:
1037+
# Batch-level error (plan gate, validation, insufficient
1038+
# concurrency, etc.). Response is a single JSON body, not
1039+
# multipart.
1040+
try:
1041+
body = response.json()
1042+
except Exception:
1043+
body = {"message": response.text, "code": "ERR::API::INTERNAL_ERROR"}
1044+
err_code = body.get("code", "ERR::API::INTERNAL_ERROR")
1045+
err_msg = body.get("message", "") or body.get("reason", "")
1046+
retry_after = None
10351047

1036-
try:
1037-
retry_after = int(response.headers.get("Retry-After", "0")) or None
1038-
except (TypeError, ValueError):
1039-
pass
1048+
try:
1049+
retry_after = int(response.headers.get("Retry-After", "0")) or None
1050+
except (TypeError, ValueError):
1051+
pass
10401052

1041-
raise HttpError(
1042-
request=response.request,
1043-
response=response,
1044-
code=err_code,
1045-
http_status_code=response.status_code,
1046-
message=err_msg,
1047-
is_retryable=body.get("retryable", False),
1048-
retry_delay=retry_after,
1049-
)
1053+
raise HttpError(
1054+
request=response.request,
1055+
response=response,
1056+
code=err_code,
1057+
http_status_code=response.status_code,
1058+
message=err_msg,
1059+
is_retryable=body.get("retryable", False),
1060+
retry_delay=retry_after,
1061+
)
1062+
1063+
for part_headers, part_body in iter_batch_parts(response):
1064+
correlation_id = part_headers.get("x-scrapfly-correlation-id", "")
1065+
cfg = config_by_correlation.get(correlation_id, scrape_configs[0])
1066+
1067+
# Proxified-response parts: the part body is the raw
1068+
# upstream bytes, not a JSON envelope. Surface a native
1069+
# requests.Response synthesized from the part headers +
1070+
# body so callers get the same shape as a single
1071+
# proxified scrape.
1072+
if part_headers.get("x-scrapfly-proxified") == "true":
1073+
try:
1074+
prox_response = _build_proxified_response_from_part(
1075+
part_headers,
1076+
part_body,
1077+
originating_request=response.request,
1078+
)
1079+
except Exception as prox_err:
1080+
yield correlation_id, ScrapflyError(
1081+
f"scrape_batch: failed to build proxified response for correlation_id={correlation_id!r}: {prox_err}",
1082+
code="ERR::API::INTERNAL_ERROR",
1083+
http_status_code=500,
1084+
)
10501085

1051-
for part_headers, part_body in iter_batch_parts(response):
1052-
correlation_id = part_headers.get("x-scrapfly-correlation-id", "")
1053-
cfg = config_by_correlation.get(correlation_id, scrape_configs[0])
1086+
continue
1087+
1088+
yield correlation_id, prox_response
1089+
1090+
continue
10541091

1055-
# Proxified-response parts: the part body is the raw
1056-
# upstream bytes, not a JSON envelope. Surface a native
1057-
# requests.Response synthesized from the part headers +
1058-
# body so callers get the same shape as a single
1059-
# proxified scrape.
1060-
if part_headers.get("x-scrapfly-proxified") == "true":
10611092
try:
1062-
prox_response = _build_proxified_response_from_part(
1063-
part_headers,
1064-
part_body,
1065-
originating_request=response.request,
1066-
)
1067-
except Exception as prox_err:
1093+
parsed = decode_part_body(part_headers, part_body, self.body_handler)
1094+
except Exception as decode_err:
10681095
yield correlation_id, ScrapflyError(
1069-
f"scrape_batch: failed to build proxified response for correlation_id={correlation_id!r}: {prox_err}",
1096+
f"scrape_batch: failed to decode part for correlation_id={correlation_id!r}: {decode_err}",
10701097
code="ERR::API::INTERNAL_ERROR",
1071-
http_status_code=500,
10721098
)
10731099

10741100
continue
10751101

1076-
yield correlation_id, prox_response
1077-
1078-
continue
1079-
1080-
try:
1081-
parsed = decode_part_body(part_headers, part_body, self.body_handler)
1082-
except Exception as decode_err:
1083-
yield correlation_id, ScrapflyError(
1084-
f"scrape_batch: failed to decode part for correlation_id={correlation_id!r}: {decode_err}",
1085-
code="ERR::API::INTERNAL_ERROR",
1086-
)
1087-
1088-
continue
1089-
1090-
try:
1091-
api_response = ScrapeApiResponse(
1092-
response=response,
1093-
request=response.request,
1094-
api_result=parsed,
1095-
scrape_config=cfg,
1096-
large_object_handler=self._handle_scrape_large_objects,
1097-
)
1098-
# Don't auto-raise on upstream error — per-part errors
1099-
# are surfaced via the yielded tuple, not exceptions.
1100-
api_response.raise_for_result(raise_on_upstream_error=False)
1101-
yield correlation_id, api_response
1102-
except ScrapflyError as scrape_err:
1103-
yield correlation_id, scrape_err
1102+
try:
1103+
api_response = ScrapeApiResponse(
1104+
response=response,
1105+
request=response.request,
1106+
api_result=parsed,
1107+
scrape_config=cfg,
1108+
large_object_handler=self._handle_scrape_large_objects,
1109+
)
1110+
# Don't auto-raise on upstream error — per-part errors
1111+
# are surfaced via the yielded tuple, not exceptions.
1112+
api_response.raise_for_result(raise_on_upstream_error=False)
1113+
yield correlation_id, api_response
1114+
except ScrapflyError as scrape_err:
1115+
yield correlation_id, scrape_err
1116+
finally:
1117+
batch_session.close()
11041118

11051119
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
11061120
"""

0 commit comments

Comments
 (0)