diff --git a/crawler/crawl.py b/crawler/crawl.py index e324cd6..d35e58e 100644 --- a/crawler/crawl.py +++ b/crawler/crawl.py @@ -1,10 +1,12 @@ from dataclasses import dataclass import os import datetime +import gc from typing import List import logging from pathlib import Path from time import time +from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError from crawler.store.konzum import KonzumCrawler @@ -34,6 +36,10 @@ logger = logging.getLogger(__name__) +# Timeout configurations (in seconds) +OVERALL_TIMEOUT_SECONDS = 1200 # 10 minutes for entire crawl process +CHAIN_TIMEOUT_SECONDS = 120 # 2 minutes per individual chain + CRAWLERS = { StudenacCrawler.CHAIN: StudenacCrawler, SparCrawler.CHAIN: SparCrawler, @@ -59,6 +65,44 @@ } +@dataclass +class CrawlResult: + elapsed_time: float = 0 + n_stores: int = 0 + n_products: int = 0 + n_prices: int = 0 + + +def crawl_chain_with_cleanup( + chain: str, date: datetime.date, path: Path +) -> CrawlResult: + """ + Crawl a specific retail chain with cleanup. + This is a wrapper that adds garbage collection after each chain completes. + + Args: + chain: The name of the retail chain to crawl. + date: The date for which to fetch the product data. + path: The directory path where the data will be saved. + + Returns: + CrawlResult with crawling statistics. + """ + try: + logger.info(f"Starting crawl for {chain} on {date:%Y-%m-%d}") + result = crawl_chain(chain, date, path) + logger.info( + f"Completed crawl for {chain}: {result.n_stores} stores, {result.n_products} products, {result.n_prices} prices in {result.elapsed_time:.2f}s" + ) + return result + except Exception as e: + logger.error(f"Failed to crawl {chain}: {e}", exc_info=True) + return CrawlResult() # Empty result for failed crawls + finally: + # Force garbage collection after each chain completes + gc.collect() + + def get_chains() -> List[str]: """ Get the list of retail chains from the crawlers. @@ -69,14 +113,6 @@ def get_chains() -> List[str]: return list(CRAWLERS.keys()) -@dataclass -class CrawlResult: - elapsed_time: float = 0 - n_stores: int = 0 - n_products: int = 0 - n_prices: int = 0 - - def crawl_chain(chain: str, date: datetime.date, path: Path) -> CrawlResult: """ Crawl a specific retail chain for product/pricing data and save it. @@ -150,11 +186,44 @@ def crawl( results = {} + logger.info(f"Starting parallel crawl of {len(chains)} chains") + t0 = time() - for chain in chains: - logger.info(f"Starting crawl for {chain} on {date:%Y-%m-%d}") - r = crawl_chain(chain, date, path / chain) - results[chain] = r + + # Process chains in parallel using ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=len(chains)) as executor: + # Submit chain crawling tasks + future_to_chain = { + executor.submit(crawl_chain_with_cleanup, chain, date, path / chain): chain + for chain in chains + } + + # Collect results as they complete with timeout handling + try: + for future in as_completed( + future_to_chain, timeout=OVERALL_TIMEOUT_SECONDS + ): + chain = future_to_chain[future] + try: + result = future.result(timeout=CHAIN_TIMEOUT_SECONDS) + results[chain] = result + except TimeoutError: + logger.error( + f"Chain '{chain}' timed out after {CHAIN_TIMEOUT_SECONDS // 60} minutes" + ) + results[chain] = CrawlResult() + except Exception as e: + logger.error(f"Failed to crawl {chain}: {e}", exc_info=True) + results[chain] = CrawlResult() + except TimeoutError: + logger.error( + f"Overall crawling process timed out after {OVERALL_TIMEOUT_SECONDS // 60} minutes" + ) + # Add empty results for any remaining chains + for chain in chains: + if chain not in results: + results[chain] = CrawlResult() + t1 = time() logger.info(f"Crawled {','.join(chains)} for {date:%Y-%m-%d} in {t1 - t0:.2f}s")