import multiprocessing import os import threading import time from hc_spider import common from hc_spider.controller import Controller from hc_spider.json_logger import JSONLogger from hc_spider.timer import Timer from hc_spider.watchdog import Watchdog from hc_spider.worker import Worker class HCSpider(threading.Thread): _config: dict _manager: multiprocessing.Manager _workers: list[multiprocessing.Process] _worker_count: int _controller: threading.Thread _logger: threading.Thread _timer: threading.Thread _watchdog: threading.Thread _shared_objects: dict _components: list[threading.Thread | list[threading.Thread]] def __init__(self) -> None: super().__init__() self.daemon = True self.name = "Core" self._config = common.load_config() self._worker_count = self._config.get("workers") self._worker_count = self._worker_count if self._worker_count > 0 else multiprocessing.cpu_count() - 1 self._manager = multiprocessing.Manager() self._shared_objects = { "job_queue": multiprocessing.Queue(-1), "log_queue": multiprocessing.Queue(-1), "shutdown_event": multiprocessing.Event(), "watchdog_event": multiprocessing.Event(), "worker_finished": multiprocessing.Barrier(self._worker_count + 1), # +1 for the logger "visited_nodes": self._manager.dict(), "not_visited_nodes": self._manager.dict(), "not_valid_urls": self._manager.list(), "urls_with_error": self._manager.list(), "config": common.load_config(), "lock": multiprocessing.Lock() } self._load_components() def _load_components(self) -> None: self._components = [ [Worker(**self._shared_objects) for _ in range(self._worker_count)], Controller(**self._shared_objects), JSONLogger(**self._shared_objects), Timer(**self._shared_objects), Watchdog(**self._shared_objects) ] def _component_handler(self, components: list[threading.Thread | list[threading.Thread]], action: str) -> None: for component in components: if isinstance(component, list) is True: match action: case "start": self._component_handler(components=component, action="start") case "join": self._component_handler(components=component, action="join") else: match action: case "start": component.start() case "join": component.join() def _shutdown_handler(self) -> None: if self._shared_objects.get("watchdog_event").is_set() is False: self._shutdown_watchdog() self._component_handler(components=self._components, action="join") def _shutdown_watchdog(self) -> None: with open(self._shared_objects.get("config").get("watchdog_file"), "w") as f: f.write("exit") def run(self) -> None: print(f"{self.name} started with pid [{os.getpid()}]") self._component_handler(components=self._components, action="start") while self._shared_objects.get("shutdown_event").is_set() is False: time.sleep(1) print(f"[{self.name}] is shutting down", flush=True) self._shutdown_handler() print(f"[{self.name}] Workers visited {len(self._shared_objects.get('visited_nodes'))} urls", flush=True) def stop(self) -> None: self._shared_objects.get("shutdown_event").set() def __del__(self) -> None: print(f"[{self.name}] exited", flush=True)