2023-10-09 14:24:00 +02:00

99 lines
3.7 KiB
Python

import multiprocessing
import os
import threading
import time
from hc_spider import common
from hc_spider.controller import Controller
from hc_spider.json_logger import JSONLogger
from hc_spider.timer import Timer
from hc_spider.watchdog import Watchdog
from hc_spider.worker import Worker
class HCSpider(threading.Thread):
_config: dict
_manager: multiprocessing.Manager
_workers: list[multiprocessing.Process]
_worker_count: int
_controller: threading.Thread
_logger: threading.Thread
_timer: threading.Thread
_watchdog: threading.Thread
_shared_objects: dict
_components: list[threading.Thread | list[threading.Thread]]
def __init__(self) -> None:
super().__init__()
self.daemon = True
self.name = "Core"
self._config = common.load_config()
self._worker_count = self._config.get("workers")
self._worker_count = self._worker_count if self._worker_count > 0 else multiprocessing.cpu_count() - 1
self._manager = multiprocessing.Manager()
self._shared_objects = {
"job_queue": multiprocessing.Queue(-1),
"log_queue": multiprocessing.Queue(-1),
"shutdown_event": multiprocessing.Event(),
"watchdog_event": multiprocessing.Event(),
"worker_finished": multiprocessing.Barrier(self._worker_count + 1), # +1 for the logger
"visited_nodes": self._manager.dict(),
"not_visited_nodes": self._manager.dict(),
"not_valid_urls": self._manager.list(),
"urls_with_error": self._manager.list(),
"config": common.load_config(),
"lock": multiprocessing.Lock()
}
self._load_components()
def _load_components(self) -> None:
self._components = [
[Worker(**self._shared_objects) for _ in range(self._worker_count)],
Controller(**self._shared_objects),
JSONLogger(**self._shared_objects),
Timer(**self._shared_objects),
Watchdog(**self._shared_objects)
]
def _component_handler(self, components: list[threading.Thread | list[threading.Thread]], action: str) -> None:
for component in components:
if isinstance(component, list) is True:
match action:
case "start":
self._component_handler(components=component, action="start")
case "join":
self._component_handler(components=component, action="join")
else:
match action:
case "start":
component.start()
case "join":
component.join()
def _shutdown_handler(self) -> None:
if self._shared_objects.get("watchdog_event").is_set() is False:
self._shutdown_watchdog()
self._component_handler(components=self._components, action="join")
def _shutdown_watchdog(self) -> None:
with open(self._shared_objects.get("config").get("watchdog_file"), "w") as f:
f.write("exit")
def run(self) -> None:
print(f"{self.name} started with pid [{os.getpid()}]")
self._component_handler(components=self._components, action="start")
while self._shared_objects.get("shutdown_event").is_set() is False:
time.sleep(1)
print(f"[{self.name}] is shutting down", flush=True)
self._shutdown_handler()
print(f"[{self.name}] Workers visited {len(self._shared_objects.get('visited_nodes'))} urls", flush=True)
def stop(self) -> None:
self._shared_objects.get("shutdown_event").set()
def __del__(self) -> None:
print(f"[{self.name}] exited", flush=True)