import copy import multiprocessing import os import queue import threading import urllib.parse import requests from hc_spider.model import SharedObjects, ResponseError from hc_spider.spider import BeautifulSoupScraper class URLProcessor: _job_queue: multiprocessing.Queue _shared_objects: SharedObjects _blacklisted_urls: list _blacklisted_domains: list _otc_base_url: str _scraper: BeautifulSoupScraper _current_url: str _response: requests.Response | ResponseError def __init__(self, **kwargs) -> None: self._shared_objects = SharedObjects(**kwargs) self._blacklisted_urls = self._shared_objects.config.get("blacklisted_urls", []) self._blacklisted_domains = self._shared_objects.config.get("blacklisted_domains", []) self._otc_base_url = self._shared_objects.config.get("otc_base_url", "") self._scraper = BeautifulSoupScraper(shared_objects=self._shared_objects) def _process_children(self) -> None: for new_url in self._response.links: if new_url in self._blacklisted_urls: continue p = urllib.parse.urlparse(new_url) if p.hostname in self._blacklisted_domains: continue if new_url.startswith(self._otc_base_url) is False: self._shared_objects.not_valid_urls.append({ "url": new_url, "orig": self._current_url }) continue if new_url in self._shared_objects.visited_nodes: continue if new_url in self._shared_objects.not_visited_nodes: continue with self._shared_objects.lock: if new_url in self._shared_objects.not_visited_nodes: continue self._shared_objects.not_visited_nodes[new_url] = self._current_url self._shared_objects.job_queue.put(new_url) def process(self, url: str) -> None: self._current_url = url if self._current_url in self._shared_objects.visited_nodes: return self._response = self._scraper.get_links(url=self._current_url) if isinstance(self._response, ResponseError): self._shared_objects.urls_with_error.append({ "url": self._current_url, "orig": self._shared_objects.not_visited_nodes.get(self._current_url), "error": f"{self._response.exc}" }) self._shared_objects.visited_nodes[self._current_url] = True return if not self._response.links: return self._process_children() class Worker(multiprocessing.Process): _shutdown_event: threading.Event _job_queue: multiprocessing.Queue _shared_objects: SharedObjects _current_url: str _url_processor: URLProcessor def __init__(self, **kwargs) -> None: self._shared_objects = SharedObjects(**kwargs) self._url_processor = URLProcessor(**kwargs) super().__init__() self.daemon = False def start(self) -> None: print(f"[{self.name}] is starting") super().start() def run(self) -> None: print(f"{self.name} started with pid [{os.getpid()}]") while self._shared_objects.shutdown_event.is_set() is False: try: self._current_url = self._shared_objects.job_queue.get(block=False) except queue.Empty: continue print(f"[{self.name}] pid [{os.getpid()}] processing {self._current_url}") self._url_processor.process(url=self._current_url) self._shared_objects.visited_nodes[self._current_url] = True self._shared_objects.not_visited_nodes.pop(self._current_url, None) print(f"[{self.name}] is shutting down", flush=True) self._shared_objects.job_queue.cancel_join_thread() self._shared_objects.worker_finished.wait(timeout=10) def __del__(self) -> None: print(f"[{self.name}] exited", flush=True)