99 lines
3.7 KiB
Python
99 lines
3.7 KiB
Python
import multiprocessing
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
from hc_spider import common
|
|
from hc_spider.controller import Controller
|
|
from hc_spider.json_logger import JSONLogger
|
|
from hc_spider.timer import Timer
|
|
from hc_spider.watchdog import Watchdog
|
|
from hc_spider.worker import Worker
|
|
|
|
|
|
class HCSpider(threading.Thread):
|
|
_config: dict
|
|
_manager: multiprocessing.Manager
|
|
_workers: list[multiprocessing.Process]
|
|
_worker_count: int
|
|
_controller: threading.Thread
|
|
_logger: threading.Thread
|
|
_timer: threading.Thread
|
|
_watchdog: threading.Thread
|
|
_shared_objects: dict
|
|
_components: list[threading.Thread | list[threading.Thread]]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.daemon = True
|
|
self.name = "Core"
|
|
self._config = common.load_config()
|
|
self._worker_count = self._config.get("workers")
|
|
self._worker_count = self._worker_count if self._worker_count > 0 else multiprocessing.cpu_count() - 1
|
|
self._manager = multiprocessing.Manager()
|
|
self._shared_objects = {
|
|
"job_queue": multiprocessing.Queue(-1),
|
|
"log_queue": multiprocessing.Queue(-1),
|
|
"shutdown_event": multiprocessing.Event(),
|
|
"watchdog_event": multiprocessing.Event(),
|
|
"worker_finished": multiprocessing.Barrier(self._worker_count + 1), # +1 for the logger
|
|
"visited_nodes": self._manager.dict(),
|
|
"not_visited_nodes": self._manager.dict(),
|
|
"not_valid_urls": self._manager.list(),
|
|
"urls_with_error": self._manager.list(),
|
|
"config": common.load_config(),
|
|
"lock": multiprocessing.Lock()
|
|
}
|
|
self._load_components()
|
|
|
|
def _load_components(self) -> None:
|
|
self._components = [
|
|
[Worker(**self._shared_objects) for _ in range(self._worker_count)],
|
|
Controller(**self._shared_objects),
|
|
JSONLogger(**self._shared_objects),
|
|
Timer(**self._shared_objects),
|
|
Watchdog(**self._shared_objects)
|
|
]
|
|
|
|
def _component_handler(self, components: list[threading.Thread | list[threading.Thread]], action: str) -> None:
|
|
for component in components:
|
|
if isinstance(component, list) is True:
|
|
match action:
|
|
case "start":
|
|
self._component_handler(components=component, action="start")
|
|
case "join":
|
|
self._component_handler(components=component, action="join")
|
|
else:
|
|
match action:
|
|
case "start":
|
|
component.start()
|
|
case "join":
|
|
component.join()
|
|
|
|
def _shutdown_handler(self) -> None:
|
|
if self._shared_objects.get("watchdog_event").is_set() is False:
|
|
self._shutdown_watchdog()
|
|
|
|
self._component_handler(components=self._components, action="join")
|
|
|
|
def _shutdown_watchdog(self) -> None:
|
|
with open(self._shared_objects.get("config").get("watchdog_file"), "w") as f:
|
|
f.write("exit")
|
|
|
|
def run(self) -> None:
|
|
print(f"{self.name} started with pid [{os.getpid()}]")
|
|
self._component_handler(components=self._components, action="start")
|
|
|
|
while self._shared_objects.get("shutdown_event").is_set() is False:
|
|
time.sleep(1)
|
|
|
|
print(f"[{self.name}] is shutting down", flush=True)
|
|
self._shutdown_handler()
|
|
print(f"[{self.name}] Workers visited {len(self._shared_objects.get('visited_nodes'))} urls", flush=True)
|
|
|
|
def stop(self) -> None:
|
|
self._shared_objects.get("shutdown_event").set()
|
|
|
|
def __del__(self) -> None:
|
|
print(f"[{self.name}] exited", flush=True)
|