Source code for web2vec.crawlers.extractors
import logging
import os
import time
from dataclasses import asdict
from typing import List
from requests import Response as ReqResponse
from scrapy.http import Response
from web2vec.config import config
from web2vec.extractors.dns_features import (
DNSFeatures,
get_dns_features_cached,
)
from web2vec.extractors.external_api.google_index_features import (
GoogleIndexFeatures,
get_google_index_features,
)
from web2vec.extractors.external_api.open_pagerank_features import (
OpenPageRankFeatures,
get_open_page_rank_features_cached,
)
from web2vec.extractors.external_api.open_phish_features import (
OpenPhishFeatures,
get_open_phish_features_cached,
)
from web2vec.extractors.external_api.phish_tank_features import (
PhishTankFeatures,
get_phishtank_features_cached,
)
from web2vec.extractors.external_api.similar_web_features import (
SimilarWebFeatures,
get_similar_web_features_cached,
)
from web2vec.extractors.external_api.url_haus_features import (
URLHausFeatures,
get_url_haus_features_cached,
)
from web2vec.extractors.html_body_features import (
HtmlBodyFeatures,
get_html_body_features,
)
from web2vec.extractors.http_response_features import (
HttpResponseFeatures,
get_http_response_features,
)
from web2vec.extractors.ssl_certification_features import (
CertificateFeatures,
get_certificate_features_cached,
)
from web2vec.extractors.url_geo_features import (
URLGeoFeatures,
get_url_geo_features_cached,
)
from web2vec.extractors.url_lexical_features import (
URLLexicalFeatures,
get_url_lexical_features_cached,
)
from web2vec.extractors.whois_features import (
WhoisFeatures,
get_whois_features_cached,
)
from web2vec.utils import (
fetch_url,
get_domain_from_url,
is_numerical_type,
sanitize_filename,
transform_value,
)
logger = logging.getLogger(__name__)
[docs]
class Extractor:
FEATURE_CLASS = None
FEATURE_TYPE = None
[docs]
def extract_features(
self, response: Response | ReqResponse | ReqResponse
) -> FEATURE_CLASS:
raise NotImplementedError
[docs]
class DNSExtractor(Extractor):
FEATURE_CLASS = DNSFeatures
FEATURE_TYPE = "DNS"
[docs]
def extract_features(self, response: Response | ReqResponse) -> DNSFeatures:
domain = get_domain_from_url(response.url)
return get_dns_features_cached(domain)
[docs]
class HtmlBodyExtractor(Extractor):
FEATURE_CLASS = HtmlBodyFeatures
FEATURE_TYPE = "HTML"
def __init__(
self,
enable_js_render: bool = False,
save_html_snapshot: bool = False,
snapshot_output_dir: str | None = None,
render_wait_seconds: float = 2.0,
) -> None:
self.enable_js_render = enable_js_render
self.save_html_snapshot = save_html_snapshot
self.snapshot_output_dir = snapshot_output_dir
self.render_wait_seconds = render_wait_seconds
def _snapshot_dir(self) -> str:
return self.snapshot_output_dir or os.path.join(
config.crawler_output_path, "html_snapshots"
)
def _save_snapshot(self, html: str, url: str, rendered: bool) -> str | None:
try:
suffix = "_rendered" if rendered else "_raw"
file_name = f"{sanitize_filename(url)}{suffix}.html"
output_dir = self._snapshot_dir()
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, file_name)
with open(output_path, "w", encoding="utf-8") as handle:
handle.write(html)
return output_path
except Exception as exc: # noqa
logger.warning(f"Could not save HTML snapshot for {url}: {exc}")
return None
def _render_with_selenium(self, url: str) -> str | None:
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
except Exception as exc: # noqa
logger.warning(f"Selenium rendering not available for {url}: {exc}")
return None
driver = None
try:
options = Options()
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--allow-insecure-localhost")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
driver.get(url)
if self.render_wait_seconds > 0:
time.sleep(self.render_wait_seconds)
return driver.page_source
except Exception as exc: # noqa
logger.warning(f"Selenium rendering failed for {url}: {exc}")
return None
finally:
if driver:
driver.quit()
[docs]
def extract_features(self, response: Response | ReqResponse) -> HtmlBodyFeatures:
body = response.text
source_mode = "raw_http"
was_js_rendered = False
if self.enable_js_render:
rendered_body = self._render_with_selenium(response.url)
if rendered_body:
body = rendered_body
source_mode = "selenium_rendered"
was_js_rendered = True
html_snapshot_path = None
if self.save_html_snapshot:
html_snapshot_path = self._save_snapshot(
html=body, url=response.url, rendered=was_js_rendered
)
return get_html_body_features(
body=body,
url=response.url,
source_mode=source_mode,
was_js_rendered=was_js_rendered,
html_snapshot_path=html_snapshot_path,
)
[docs]
class HttpResponseExtractor(Extractor):
FEATURE_CLASS = HttpResponseFeatures
FEATURE_TYPE = "HTTP"
[docs]
def extract_features(
self, response: Response | ReqResponse
) -> HttpResponseFeatures:
response.status_code = getattr(response, "status", response.status_code)
url = response.url
return get_http_response_features(response=response, url=url)
[docs]
class CertificateExtractor(Extractor):
FEATURE_CLASS = CertificateFeatures
FEATURE_TYPE = "SSL"
[docs]
def extract_features(self, response: Response | ReqResponse) -> CertificateFeatures:
return get_certificate_features_cached(
hostname=get_domain_from_url(response.url)
)
[docs]
class UrlGeoExtractor(Extractor):
FEATURE_CLASS = URLGeoFeatures
FEATURE_TYPE = "GEO"
[docs]
def extract_features(self, response: Response | ReqResponse) -> URLGeoFeatures:
return get_url_geo_features_cached(url=response.url)
[docs]
class UrlLexicalExtractor(Extractor):
FEATURE_CLASS = URLLexicalFeatures
FEATURE_TYPE = "LEXICAL"
[docs]
def extract_features(self, response: Response | ReqResponse) -> URLLexicalFeatures:
return get_url_lexical_features_cached(url=response.url)
[docs]
class WhoisExtractor(Extractor):
FEATURE_CLASS = WhoisFeatures
FEATURE_TYPE = "WHOIS"
[docs]
def extract_features(self, response: Response | ReqResponse) -> WhoisFeatures:
return get_whois_features_cached(domain=get_domain_from_url(response.url))
[docs]
class GoogleIndexExtractor(Extractor):
FEATURE_CLASS = GoogleIndexFeatures
FEATURE_TYPE = "GOOGLE_INDEX"
[docs]
def extract_features(self, response: Response | ReqResponse) -> GoogleIndexFeatures:
return get_google_index_features(url=response.url)
[docs]
class OpenPageRankExtractor(Extractor):
FEATURE_CLASS = OpenPageRankFeatures
FEATURE_TYPE = "OPEN_PAGE_RANK"
[docs]
def extract_features(
self, response: Response | ReqResponse
) -> OpenPageRankFeatures:
return get_open_page_rank_features_cached(
domain=get_domain_from_url(response.url)
)
[docs]
class OpenPhishExtractor(Extractor):
FEATURE_CLASS = OpenPhishFeatures
FEATURE_TYPE = "OPEN_PHISH"
[docs]
def extract_features(self, response: Response | ReqResponse) -> OpenPhishFeatures:
return get_open_phish_features_cached(url=response.url)
[docs]
class PhishTankExtractor(Extractor):
FEATURE_CLASS = PhishTankFeatures
FEATURE_TYPE = "PHISH_TANK"
[docs]
def extract_features(self, response: Response | ReqResponse) -> PhishTankFeatures:
return get_phishtank_features_cached(domain=get_domain_from_url(response.url))
[docs]
class SimilarWebExtractor(Extractor):
FEATURE_CLASS = SimilarWebFeatures
FEATURE_TYPE = "SIMILAR_WEB"
[docs]
def extract_features(self, response: Response | ReqResponse) -> SimilarWebFeatures:
return get_similar_web_features_cached(domain=get_domain_from_url(response.url))
[docs]
class UrlHausExtractor(Extractor):
FEATURE_CLASS = URLHausFeatures
FEATURE_TYPE = "URL_HAUS"
[docs]
def extract_features(self, response: Response | ReqResponse) -> URLHausFeatures:
return get_url_haus_features_cached(domain=get_domain_from_url(response.url))
ALL_EXTRACTORS = [
DNSExtractor(),
HtmlBodyExtractor(),
HttpResponseExtractor(),
CertificateExtractor(),
UrlGeoExtractor(),
UrlLexicalExtractor(),
WhoisExtractor(),
GoogleIndexExtractor(),
OpenPageRankExtractor(),
OpenPhishExtractor(),
PhishTankExtractor(),
SimilarWebExtractor(),
UrlHausExtractor(),
]
[docs]
def process_extractors(
url: str, extractors: List[Extractor], use_only_numerical: bool = False
) -> dict:
"""Process a list of extractors for a given URL."""
extractors_result = {}
try:
response = fetch_url(url)
for extractor in extractors:
try:
result = extractor.extract_features(response)
result_as_dict = asdict(result)
extractors_result.update(
{
f"{extractor.FEATURE_TYPE}_{key}": transform_value(value)
for key, value in result_as_dict.items()
if not use_only_numerical or is_numerical_type(value)
}
)
except Exception as e: # noqa
logger.warning(
f"Error extracting features with {extractor.features_name()}: {e}"
)
except Exception as e: # noqa
logger.warning(f"Couldn't reach {url}. {e}")
return extractors_result