Source code for web2vec.crawlers.extractors

import logging
from dataclasses import asdict
from typing import List

from requests import Response as ReqResponse
from scrapy.http import Response

from web2vec.extractors.dns_features import (
    DNSFeatures,
    get_dns_features_cached,
)
from web2vec.extractors.external_api.google_index_features import (
    GoogleIndexFeatures,
    get_google_index_features,
)
from web2vec.extractors.external_api.open_pagerank_features import (
    OpenPageRankFeatures,
    get_open_page_rank_features_cached,
)
from web2vec.extractors.external_api.open_phish_features import (
    OpenPhishFeatures,
    get_open_phish_features_cached,
)
from web2vec.extractors.external_api.phish_tank_features import (
    PhishTankFeatures,
    get_phishtank_features_cached,
)
from web2vec.extractors.external_api.similar_web_features import (
    SimilarWebFeatures,
    get_similar_web_features_cached,
)
from web2vec.extractors.external_api.url_haus_features import (
    URLHausFeatures,
    get_url_haus_features_cached,
)
from web2vec.extractors.html_body_features import (
    HtmlBodyFeatures,
    get_html_body_features,
)
from web2vec.extractors.http_response_features import (
    HttpResponseFeatures,
    get_http_response_features,
)
from web2vec.extractors.ssl_certification_features import (
    CertificateFeatures,
    get_certificate_features_cached,
)
from web2vec.extractors.url_geo_features import (
    URLGeoFeatures,
    get_url_geo_features_cached,
)
from web2vec.extractors.url_lexical_features import (
    URLLexicalFeatures,
    get_url_lexical_features_cached,
)
from web2vec.extractors.whois_features import (
    WhoisFeatures,
    get_whois_features_cached,
)
from web2vec.utils import (
    fetch_url,
    get_domain_from_url,
    is_numerical_type,
    transform_value,
)

logger = logging.getLogger(__name__)


[docs] class Extractor: FEATURE_CLASS = None FEATURE_TYPE = None
[docs] def extract_features( self, response: Response | ReqResponse | ReqResponse ) -> FEATURE_CLASS: raise NotImplementedError
[docs] def features_name(self) -> str: return self.FEATURE_CLASS.__name__
[docs] class DNSExtractor(Extractor): FEATURE_CLASS = DNSFeatures FEATURE_TYPE = "DNS"
[docs] def extract_features(self, response: Response | ReqResponse) -> DNSFeatures: domain = get_domain_from_url(response.url) return get_dns_features_cached(domain)
[docs] class HtmlBodyExtractor(Extractor): FEATURE_CLASS = HtmlBodyFeatures FEATURE_TYPE = "HTML"
[docs] def extract_features(self, response: Response | ReqResponse) -> HtmlBodyFeatures: return get_html_body_features(body=response.text, url=response.url)
[docs] class HttpResponseExtractor(Extractor): FEATURE_CLASS = HttpResponseFeatures FEATURE_TYPE = "HTTP"
[docs] def extract_features( self, response: Response | ReqResponse ) -> HttpResponseFeatures: response.status_code = getattr(response, "status", response.status_code) url = response.url return get_http_response_features(response=response, url=url)
[docs] class CertificateExtractor(Extractor): FEATURE_CLASS = CertificateFeatures FEATURE_TYPE = "SSL"
[docs] def extract_features(self, response: Response | ReqResponse) -> CertificateFeatures: return get_certificate_features_cached( hostname=get_domain_from_url(response.url) )
[docs] class UrlGeoExtractor(Extractor): FEATURE_CLASS = URLGeoFeatures FEATURE_TYPE = "GEO"
[docs] def extract_features(self, response: Response | ReqResponse) -> URLGeoFeatures: return get_url_geo_features_cached(url=response.url)
[docs] class UrlLexicalExtractor(Extractor): FEATURE_CLASS = URLLexicalFeatures FEATURE_TYPE = "LEXICAL"
[docs] def extract_features(self, response: Response | ReqResponse) -> URLLexicalFeatures: return get_url_lexical_features_cached(url=response.url)
[docs] class WhoisExtractor(Extractor): FEATURE_CLASS = WhoisFeatures FEATURE_TYPE = "WHOIS"
[docs] def extract_features(self, response: Response | ReqResponse) -> WhoisFeatures: return get_whois_features_cached(domain=get_domain_from_url(response.url))
[docs] class GoogleIndexExtractor(Extractor): FEATURE_CLASS = GoogleIndexFeatures FEATURE_TYPE = "GOOGLE_INDEX"
[docs] def extract_features(self, response: Response | ReqResponse) -> GoogleIndexFeatures: return get_google_index_features(url=response.url)
[docs] class OpenPageRankExtractor(Extractor): FEATURE_CLASS = OpenPageRankFeatures FEATURE_TYPE = "OPEN_PAGE_RANK"
[docs] def extract_features( self, response: Response | ReqResponse ) -> OpenPageRankFeatures: return get_open_page_rank_features_cached( domain=get_domain_from_url(response.url) )
[docs] class OpenPhishExtractor(Extractor): FEATURE_CLASS = OpenPhishFeatures FEATURE_TYPE = "OPEN_PHISH"
[docs] def extract_features(self, response: Response | ReqResponse) -> OpenPhishFeatures: return get_open_phish_features_cached(url=response.url)
[docs] class PhishTankExtractor(Extractor): FEATURE_CLASS = PhishTankFeatures FEATURE_TYPE = "PHISH_TANK"
[docs] def extract_features(self, response: Response | ReqResponse) -> PhishTankFeatures: return get_phishtank_features_cached(domain=get_domain_from_url(response.url))
[docs] class SimilarWebExtractor(Extractor): FEATURE_CLASS = SimilarWebFeatures FEATURE_TYPE = "SIMILAR_WEB"
[docs] def extract_features(self, response: Response | ReqResponse) -> SimilarWebFeatures: return get_similar_web_features_cached(domain=get_domain_from_url(response.url))
[docs] class UrlHausExtractor(Extractor): FEATURE_CLASS = URLHausFeatures FEATURE_TYPE = "URL_HAUS"
[docs] def extract_features(self, response: Response | ReqResponse) -> URLHausFeatures: return get_url_haus_features_cached(domain=get_domain_from_url(response.url))
ALL_EXTRACTORS = [ DNSExtractor(), HtmlBodyExtractor(), HttpResponseExtractor(), CertificateExtractor(), UrlGeoExtractor(), UrlLexicalExtractor(), WhoisExtractor(), GoogleIndexExtractor(), OpenPageRankExtractor(), OpenPhishExtractor(), PhishTankExtractor(), SimilarWebExtractor(), UrlHausExtractor(), ]
[docs] def process_extractors( url: str, extractors: List[Extractor], use_only_numerical: bool = False ) -> dict: """Process a list of extractors for a given URL.""" extractors_result = {} try: response = fetch_url(url) for extractor in extractors: try: result = extractor.extract_features(response) result_as_dict = asdict(result) extractors_result.update( { f"{extractor.FEATURE_TYPE}_{key}": transform_value(value) for key, value in result_as_dict.items() if not use_only_numerical or is_numerical_type(value) } ) except Exception as e: # noqa logger.warning( f"Error extracting features with {extractor.features_name()}: {e}" ) except Exception as e: # noqa logger.warning(f"Couldn't reach {url}. {e}") return extractors_result