# На модуль распространяется лицензия "GNU General Public License v3.0" # https://github.com/all-licenses/GNU-General-Public-License-v3.0 # scope: hikka_only # meta developer: @pymodule # requires: python-whois dnspython requests import socket import whois import requests import dns.resolver import asyncio import ssl from .. import loader, utils class DomainScannerMod(loader.Module): """Scan a domain / Сканирование домена""" strings = { "name": "DomainScanner", "no_domain": "Specify a domain to scan.", "scanning": "🔍 Scanning {}...", "ip": "🖥 IP: {}", "ip_fail": "⚠️ Failed to get IP.", "whois": "📜 WHOIS:\n{}", "whois_fail": "⚠️ Failed to get WHOIS.", "dns": "🛡 DNS A records:", "dns_fail": "⚠️ Failed to get DNS records.", "mx": "📧 MX records:", "mx_fail": "⚠️ Failed to get MX records.", "txt": "📄 TXT records:", "txt_fail": "⚠️ Failed to get TXT records.", "ssl": "🔒 SSL Certificate:\n - Issued by: {}\n - Expires: {}", "ssl_fail": "⚠️ Failed to get SSL certificate.", "subs": "🌐 Subdomains:", "subs_fail": "⚠️ No subdomains found.", "http": "📶 HTTP Status: {}", "http_fail": "⚠️ Failed to get HTTP status.", "ports": "🚪 Open ports: {}", "ports_fail": "⚠️ No open ports found.", } strings_ru = { "no_domain": "Укажите домен для сканирования.", "scanning": "🔍 Сканирую {}...", "ip": "🖥 IP: {}", "ip_fail": "⚠️ Не удалось получить IP.", "whois": "📜 WHOIS:\n{}", "whois_fail": "⚠️ Не удалось получить WHOIS.", "dns": "🛡 DNS A-записи:", "dns_fail": "⚠️ Не удалось получить DNS-записи.", "mx": "📧 MX-записи:", "mx_fail": "⚠️ Не удалось получить MX-записи.", "txt": "📄 TXT-записи:", "txt_fail": "⚠️ Не удалось получить TXT-записи.", "ssl": "🔒 SSL-сертификат:\n - Выдан: {}\n - Истекает: {}", "ssl_fail": "⚠️ Не удалось получить SSL-сертификат.", "subs": "🌐 Поддомены:", "subs_fail": "⚠️ Поддомены не найдены.", "http": "📶 Статус HTTP: {}", "http_fail": "⚠️ Не удалось получить HTTP-статус.", "ports": "🚪 Открытые порты: {}", "ports_fail": "⚠️ Открытые порты не найдены.", } async def client_ready(self, client, db): self.client = client @loader.command( doc="Scan domain. Usage: .domscan ", ru_doc="Сканировать домен. Использование: .domscan <домен>" ) async def domscancmd(self, message): """Scan domain / Сканировать домен. Usage: .domscan """ domain = utils.get_args_raw(message).strip() if not domain: return await utils.answer(message, self.strings("no_domain")) await utils.answer(message, self.strings("scanning").format(domain)) result = [] async def get_ip(): try: return socket.gethostbyname(domain) except Exception: return None async def get_whois(): try: return await asyncio.to_thread(whois.whois, domain) except Exception: return None async def get_dns_record(rtype): try: return dns.resolver.resolve(domain, rtype) except Exception: return None async def get_ssl_info(): try: ctx = ssl.create_default_context() with ctx.wrap_socket(socket.create_connection((domain, 443), timeout=5), server_hostname=domain) as s: return s.getpeercert() except Exception: return None async def check_subdomains(subs): found = [] for sub in subs: subdomain = f"{sub}.{domain}" try: ip = socket.gethostbyname(subdomain) found.append(f" - {subdomain} → {ip}") except Exception: continue return found async def check_http(): try: r = requests.get(f"http://{domain}", timeout=5) return r.status_code except Exception: return None async def check_ports(): ports = [] for port in [21, 22, 25, 53, 80, 110, 143, 443, 587, 993, 995]: try: with socket.create_connection((domain, port), timeout=1): ports.append(str(port)) except Exception: continue return ports ip, whois_info, dns_a, dns_mx, dns_txt, ssl_cert, subdomains, http_status, open_ports = await asyncio.gather( get_ip(), get_whois(), get_dns_record("A"), get_dns_record("MX"), get_dns_record("TXT"), get_ssl_info(), check_subdomains(["www", "mail", "ftp", "api", "dev", "blog", "admin", "portal", "shop"]), check_http(), check_ports() ) result.append(self.strings("ip").format(ip) if ip else self.strings("ip_fail")) if whois_info: summary = str(whois_info) result.append(self.strings("whois").format(summary)) else: result.append(self.strings("whois_fail")) if dns_a: result.append(self.strings("dns")) result.extend([f" - {r.to_text()}" for r in dns_a]) else: result.append(self.strings("dns_fail")) if dns_mx: result.append(self.strings("mx")) result.extend([f" - {r.to_text()}" for r in dns_mx]) else: result.append(self.strings("mx_fail")) if dns_txt: result.append(self.strings("txt")) result.extend([f" - {r.to_text()}" for r in dns_txt]) else: result.append(self.strings("txt_fail")) if ssl_cert: issuer = " ".join(x[0][1] for x in ssl_cert.get("issuer", [])) or "Unknown" expires = ssl_cert.get("notAfter", "Unknown") result.append(self.strings("ssl").format(issuer, expires)) else: result.append(self.strings("ssl_fail")) if subdomains: result.append(self.strings("subs")) result.extend(subdomains) else: result.append(self.strings("subs_fail")) result.append(self.strings("http").format(http_status) if http_status else self.strings("http_fail")) if open_ports: result.append(self.strings("ports").format(", ".join(open_ports))) else: result.append(self.strings("ports_fail")) await utils.answer(message, "\n".join(result))