diff --git a/hc_spider/spider.py b/hc_spider/spider.py new file mode 100644 index 0000000..3e68918 --- /dev/null +++ b/hc_spider/spider.py @@ -0,0 +1,133 @@ +import re +import urllib.parse + +import requests +from bs4 import BeautifulSoup + +from hc_spider.abstract import ISpider +from hc_spider.model import Response, ResponseError + + +class ScrapySpider(ISpider): + def get_links(self, url: str) -> Response | ResponseError: + pass + + +class BeautifulSoupSpiderBase(ISpider): + _response: requests.Response + _raw_urls: set[str] + _scraper_url: str + + def _get_raw_page(self) -> requests.Response | ResponseError: + try: + resp = requests.get(url=self._scraper_url, timeout=2) + resp.raise_for_status() + + except Exception as e: + return ResponseError(exc=e) + + return resp + + def _load_raw_urls(self) -> None: + soup = BeautifulSoup(self._response.content, "lxml") + self._raw_urls = set([i.get("href") for i in soup.find_all('a', href=True)]) + + def get_links(self, url: str) -> Response | ResponseError: + self._scraper_url = url + resp = self._get_raw_page() + + if isinstance(resp, ResponseError) is True: + return resp + self._response = resp + self._load_raw_urls() + + return Response(links=self._raw_urls) + + +class BeautifulSoupScraper(BeautifulSoupSpiderBase): + _scraper_url: str + _absolute_urls: set + _relative_urls: set + + def get_links(self, url: str) -> Response | ResponseError: + self._scraper_url = url + + if isinstance((resp := super().get_links(url=url)), ResponseError): + return resp + + if not resp.links: + return resp + + self._process_response() + + return Response(links=set.union(self._absolute_urls, self._relative_urls)) + + def _clean_url_list(self) -> None: + result = [url for url in self._raw_urls if + # any(re.findall(r'api-ref|operation_guide|umn|#|mailto', url, re.IGNORECASE))] + any(re.findall(r'#|mailto|API%20Documents', url, re.IGNORECASE))] + + self._raw_urls = self._raw_urls - set(result) + + def _process_absolute_urls(self) -> None: + self._absolute_urls = {i for i in self._raw_urls if i.startswith("https://") or i.startswith("http://")} + + def _process_relative_urls(self) -> None: + base_url: str + final_url: str + results = set() + + if not self._raw_urls: + self._relative_urls = set() + return + + # If the original url ends with .html e.g.: https://docs.otc.t-systems.com/developer/api.html + if self._scraper_url.endswith(".html") is True: + # Removing /xy.html part + base_url = self._scraper_url.rsplit("/", 1)[0] + + elif self._scraper_url.endswith("/") is True: + base_url = self._scraper_url.rstrip("/") + + else: + base_url = self._scraper_url + + # Creating an url parse instance + p = urllib.parse.urlparse(base_url) + + for url in self._raw_urls: + # If we found a relative path in the url e.g.: href="../services.html">Services + # we should resolve the absolute path + if url.startswith(".."): + res = list() + complete_url = f"{base_url}/{url}" + # https://example.com/1/2/../../index.html + for i in complete_url.split("/"): + # ['https:', '', 'example.com', '1', '2', '..', '..', 'index.html'] + if i and i != "..": + res.append(i) + else: + res.pop() + # res = ['example.com', 'index.html'] + final_url = f"{p.scheme}://{'/'.join(res)}" + + elif url.startswith("/") is True: + final_url = f"{self._shared_objects.config.get('otc_base_url')}{url}" + + else: + final_url = f"{base_url}/{url}" + + results.add(final_url) + + self._relative_urls = results + + def _process_response(self) -> None: + self._clean_url_list() + self._process_absolute_urls() + self._raw_urls = self._raw_urls - self._absolute_urls + self._process_relative_urls() + + +class RequestsSpider(ISpider): + def get_links(self, url: str) -> Response | ResponseError: + pass