Source code for web2vec.extractors.ssl_certification_features

import logging
import socket
import ssl
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from functools import cache
from typing import Any, Dict, Optional, Tuple

import idna
import requests

from web2vec.config import config

logger = logging.getLogger(__name__)

FREE_CA_KEYWORDS = {
    "let's encrypt",
    "zerossl",
    "zero ssl",
    "buypass",
    "ssl.com free",
    "certbot",
}


def _flatten_name_entries(name_value: Any) -> Dict[str, str]:
    """Convert subject/issuer structures from ssl certs into a flat dict."""
    flat: Dict[str, str] = {}
    if isinstance(name_value, dict):
        for key, value in name_value.items():
            if isinstance(value, str):
                flat[key] = value
        return flat
    if isinstance(name_value, (tuple, list)):
        for rdn in name_value:
            if isinstance(rdn, (tuple, list)):
                for attr in rdn:
                    if (
                        isinstance(attr, (tuple, list))
                        and len(attr) >= 2
                        and isinstance(attr[0], str)
                        and isinstance(attr[1], str)
                    ):
                        flat[attr[0]] = attr[1]
    return flat


def _identify_known_ca(flat_name: Dict[str, str]) -> Optional[str]:
    """Return None; helper retained only for backwards compatibility."""
    return None


def _first_value(flat_name: Dict[str, str], keys: Tuple[str, ...]) -> Optional[str]:
    for key in keys:
        if key in flat_name:
            return flat_name[key]
        # handle case-insensitive matches
        for existing_key, value in flat_name.items():
            if existing_key.lower() == key.lower():
                return value
    return None


[docs] @dataclass class CertificateFeatures: subject: Dict[str, Any] issuer: Dict[str, Any] not_before: datetime not_after: datetime is_valid: bool validity_message: str is_trusted: bool trust_message: str issuer_common_name: Optional[str] = field(init=False, default=None) issuer_organization_name: Optional[str] = field(init=False, default=None) issuer_is_lets_encrypt: Optional[bool] = field(init=False, default=None) issuer_is_free_ca: Optional[bool] = field(init=False, default=None) validity_duration_days: Optional[int] = field(init=False, default=None) days_until_expiration: Optional[int] = field(init=False, default=None) expires_within_7_days: Optional[bool] = field(init=False, default=None) expires_within_30_days: Optional[bool] = field(init=False, default=None) valid_in_7_days: Optional[bool] = field(init=False, default=None) valid_in_30_days: Optional[bool] = field(init=False, default=None) is_expired: Optional[bool] = field(init=False, default=None) def __post_init__(self) -> None: self._compute_temporal_features() self._compute_issuer_features() def _has_certificate_window(self) -> bool: return self.not_before != datetime.min and self.not_after != datetime.min def _compute_temporal_features(self) -> None: if not self._has_certificate_window(): self.validity_duration_days = None self.days_until_expiration = None self.expires_within_7_days = None self.expires_within_30_days = None self.valid_in_7_days = None self.valid_in_30_days = None self.is_expired = None return self.validity_duration_days = (self.not_after - self.not_before).days now = datetime.utcnow() remaining = self.not_after - now self.days_until_expiration = remaining.days self.is_expired = remaining.total_seconds() < 0 self.expires_within_7_days = 0 <= remaining.days <= 7 self.expires_within_30_days = 0 <= remaining.days <= 30 self.valid_in_7_days = self._is_valid_on_date(now + timedelta(days=7)) self.valid_in_30_days = self._is_valid_on_date(now + timedelta(days=30)) def _is_valid_on_date(self, target_date: datetime) -> bool: if not self._has_certificate_window(): return False return self.not_before <= target_date <= self.not_after def _compute_issuer_features(self) -> None: issuer_flat = _flatten_name_entries(self.issuer) if not issuer_flat: self.issuer_common_name = None self.issuer_organization_name = None self.issuer_is_lets_encrypt = None self.issuer_is_free_ca = None return self.issuer_common_name = _first_value(issuer_flat, ("commonName", "CN")) self.issuer_organization_name = _first_value( issuer_flat, ("organizationName", "O") ) combined = " ".join(value.lower() for value in issuer_flat.values()) is_lets_encrypt = "let's encrypt" in combined or "lets encrypt" in combined self.issuer_is_lets_encrypt = is_lets_encrypt self.issuer_is_free_ca = any( keyword in combined for keyword in FREE_CA_KEYWORDS )
[docs] def get_tls_certificate(hostname: str, port: int = 443) -> Dict[str, Any]: """Retrieve the TLS certificate for a given hostname and port.""" try: context = ssl.create_default_context() hostname_idna = idna.encode(hostname).decode("ascii") with socket.create_connection((hostname_idna, port)) as sock: with context.wrap_socket(sock, server_hostname=hostname_idna) as ssock: cert = ssock.getpeercert() return cert except Exception as e: # noqa logger.debug(f"Error retrieving certificate for {hostname}: {e}") return {}
[docs] def is_certificate_valid(cert: Dict[str, Any]) -> Tuple[bool, str]: """Check if the certificate is currently valid based on its validity dates.""" if not cert: return False, "No certificate found" current_date = datetime.utcnow() not_before = datetime.strptime(cert["notBefore"], "%b %d %H:%M:%S %Y %Z") not_after = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") if not_before <= current_date <= not_after: return True, "Certificate is valid" else: return ( False, f"Certificate is not valid: notBefore={not_before}, notAfter={not_after}", )
[docs] def is_certificate_trusted(cert: Dict[str, Any]) -> Tuple[bool, str]: """Check if the certificate is trusted by the system's CA store.""" context = ssl.create_default_context() try: context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = False ca_certs = context.get_ca_certs(binary_form=True) # noqa store = context.cert_store_stats() # noqa return True, "Certificate is signed by a trusted CA" except ssl.SSLError as e: return False, f"Certificate is not trusted: {e}"
[docs] def check_ssl(url: str) -> bool: """Check if the SSL certificate of the URL is valid.""" try: requests.get(url, verify=True, timeout=config.api_timeout) return True except Exception: # noqa return False
[docs] def get_certificate_features(hostname: str) -> CertificateFeatures: """Retrieve and analyze the TLS certificate for a given hostname.""" cert = get_tls_certificate(hostname) if cert: is_valid, validity_message = is_certificate_valid(cert) is_trusted, trust_message = is_certificate_trusted(cert) not_before = datetime.strptime(cert["notBefore"], "%b %d %H:%M:%S %Y %Z") not_after = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") return CertificateFeatures( subject=cert.get("subject", {}), issuer=cert.get("issuer", {}), not_before=not_before, not_after=not_after, is_valid=is_valid, validity_message=validity_message, is_trusted=is_trusted, trust_message=trust_message, ) else: return CertificateFeatures( subject={}, issuer={}, not_before=datetime.min, not_after=datetime.min, is_valid=False, validity_message="No certificate found", is_trusted=False, trust_message="No certificate found", )
[docs] @cache def get_certificate_features_cached(hostname: str) -> CertificateFeatures: """Get the certificate features for the given hostname.""" return get_certificate_features(hostname)
if __name__ == "__main__": hostname = "www.example.com" cert_info = get_certificate_features(hostname) print(f"Certificate for {hostname}") print(f"Subject: {cert_info.subject}") print(f"Issuer: {cert_info.issuer}") print(f"Validity: {cert_info.validity_message}") print(f"Trust: {cert_info.trust_message}") print(f"Valid from {cert_info.not_before} to {cert_info.not_after}")