hc-spider/hc_spider/worker.py

118 lines
4.0 KiB
Python

import copy
import multiprocessing
import os
import queue
import threading
import urllib.parse
import requests
from hc_spider.model import SharedObjects, ResponseError
from hc_spider.spider import BeautifulSoupScraper
class URLProcessor:
_job_queue: multiprocessing.Queue
_shared_objects: SharedObjects
_blacklisted_urls: list
_blacklisted_domains: list
_otc_base_url: str
_scraper: BeautifulSoupScraper
_current_url: str
_response: requests.Response | ResponseError
def __init__(self, **kwargs) -> None:
self._shared_objects = SharedObjects(**kwargs)
self._blacklisted_urls = self._shared_objects.config.get("blacklisted_urls", [])
self._blacklisted_domains = self._shared_objects.config.get("blacklisted_domains", [])
self._otc_base_url = self._shared_objects.config.get("otc_base_url", "")
self._scraper = BeautifulSoupScraper(shared_objects=self._shared_objects)
def _process_children(self) -> None:
for new_url in self._response.links:
if new_url in self._blacklisted_urls:
continue
p = urllib.parse.urlparse(new_url)
if p.hostname in self._blacklisted_domains:
continue
if new_url.startswith(self._otc_base_url) is False:
self._shared_objects.not_valid_urls.append({
"url": new_url,
"orig": self._current_url
})
continue
if new_url in self._shared_objects.visited_nodes:
continue
if new_url in self._shared_objects.not_visited_nodes:
continue
with self._shared_objects.lock:
if new_url in self._shared_objects.not_visited_nodes:
continue
self._shared_objects.not_visited_nodes[new_url] = self._current_url
self._shared_objects.job_queue.put(new_url)
def process(self, url: str) -> None:
self._current_url = url
if self._current_url in self._shared_objects.visited_nodes:
return
self._response = self._scraper.get_links(url=self._current_url)
if isinstance(self._response, ResponseError):
self._shared_objects.urls_with_error.append({
"url": self._current_url,
"orig": self._shared_objects.not_visited_nodes.get(self._current_url),
"error": f"{self._response.exc}"
})
self._shared_objects.visited_nodes[self._current_url] = True
return
if not self._response.links:
return
self._process_children()
class Worker(multiprocessing.Process):
_shutdown_event: threading.Event
_job_queue: multiprocessing.Queue
_shared_objects: SharedObjects
_current_url: str
_url_processor: URLProcessor
def __init__(self, **kwargs) -> None:
self._shared_objects = SharedObjects(**kwargs)
self._url_processor = URLProcessor(**kwargs)
super().__init__()
self.daemon = False
def start(self) -> None:
print(f"[{self.name}] is starting")
super().start()
def run(self) -> None:
print(f"{self.name} started with pid [{os.getpid()}]")
while self._shared_objects.shutdown_event.is_set() is False:
try:
self._current_url = self._shared_objects.job_queue.get(block=False)
except queue.Empty:
continue
print(f"[{self.name}] pid [{os.getpid()}] processing {self._current_url}")
self._url_processor.process(url=self._current_url)
self._shared_objects.visited_nodes[self._current_url] = True
self._shared_objects.not_visited_nodes.pop(self._current_url, None)
print(f"[{self.name}] is shutting down", flush=True)
self._shared_objects.job_queue.cancel_join_thread()
self._shared_objects.worker_finished.wait(timeout=10)
def __del__(self) -> None:
print(f"[{self.name}] exited", flush=True)