Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 81 additions & 12 deletions crawler/crawl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from dataclasses import dataclass
import os
import datetime
import gc
from typing import List
import logging
from pathlib import Path
from time import time
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError


from crawler.store.konzum import KonzumCrawler
Expand Down Expand Up @@ -34,6 +36,10 @@

logger = logging.getLogger(__name__)

# Timeout configurations (in seconds)
OVERALL_TIMEOUT_SECONDS = 1200 # 10 minutes for entire crawl process
CHAIN_TIMEOUT_SECONDS = 120 # 2 minutes per individual chain

CRAWLERS = {
StudenacCrawler.CHAIN: StudenacCrawler,
SparCrawler.CHAIN: SparCrawler,
Expand All @@ -59,6 +65,44 @@
}


@dataclass
class CrawlResult:
elapsed_time: float = 0
n_stores: int = 0
n_products: int = 0
n_prices: int = 0


def crawl_chain_with_cleanup(
chain: str, date: datetime.date, path: Path
) -> CrawlResult:
"""
Crawl a specific retail chain with cleanup.
This is a wrapper that adds garbage collection after each chain completes.

Args:
chain: The name of the retail chain to crawl.
date: The date for which to fetch the product data.
path: The directory path where the data will be saved.

Returns:
CrawlResult with crawling statistics.
"""
try:
logger.info(f"Starting crawl for {chain} on {date:%Y-%m-%d}")
result = crawl_chain(chain, date, path)
logger.info(
f"Completed crawl for {chain}: {result.n_stores} stores, {result.n_products} products, {result.n_prices} prices in {result.elapsed_time:.2f}s"
)
return result
except Exception as e:
logger.error(f"Failed to crawl {chain}: {e}", exc_info=True)
return CrawlResult() # Empty result for failed crawls
finally:
# Force garbage collection after each chain completes
gc.collect()


def get_chains() -> List[str]:
"""
Get the list of retail chains from the crawlers.
Expand All @@ -69,14 +113,6 @@ def get_chains() -> List[str]:
return list(CRAWLERS.keys())


@dataclass
class CrawlResult:
elapsed_time: float = 0
n_stores: int = 0
n_products: int = 0
n_prices: int = 0


def crawl_chain(chain: str, date: datetime.date, path: Path) -> CrawlResult:
"""
Crawl a specific retail chain for product/pricing data and save it.
Expand Down Expand Up @@ -150,11 +186,44 @@ def crawl(

results = {}

logger.info(f"Starting parallel crawl of {len(chains)} chains")

t0 = time()
for chain in chains:
logger.info(f"Starting crawl for {chain} on {date:%Y-%m-%d}")
r = crawl_chain(chain, date, path / chain)
results[chain] = r

# Process chains in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=len(chains)) as executor:
# Submit chain crawling tasks
future_to_chain = {
executor.submit(crawl_chain_with_cleanup, chain, date, path / chain): chain
for chain in chains
}

# Collect results as they complete with timeout handling
try:
for future in as_completed(
future_to_chain, timeout=OVERALL_TIMEOUT_SECONDS
):
chain = future_to_chain[future]
try:
result = future.result(timeout=CHAIN_TIMEOUT_SECONDS)
results[chain] = result
except TimeoutError:
logger.error(
f"Chain '{chain}' timed out after {CHAIN_TIMEOUT_SECONDS // 60} minutes"
)
results[chain] = CrawlResult()
except Exception as e:
logger.error(f"Failed to crawl {chain}: {e}", exc_info=True)
results[chain] = CrawlResult()
except TimeoutError:
logger.error(
f"Overall crawling process timed out after {OVERALL_TIMEOUT_SECONDS // 60} minutes"
)
# Add empty results for any remaining chains
for chain in chains:
if chain not in results:
results[chain] = CrawlResult()

t1 = time()

logger.info(f"Crawled {','.join(chains)} for {date:%Y-%m-%d} in {t1 - t0:.2f}s")
Expand Down
Loading