"""whois module for hikka userbot Copyright (C) 2025 Ruslan Isaev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see https://www.gnu.org/licenses/.""" __version__ = (3, 0, 1) # meta developer: @RUIS_VlP # при поддержке @hikka_mods import json import aiohttp from .. import loader, utils import asyncio import re from typing import List async def clean_domain(value: str) -> str: # Убираем протокол, порт, путь value = re.sub(r'^(https?://)?', '', value) value = value.split('/')[0] value = value.split(':')[0] return value async def ipcheck(value: str) -> str: # Проверка IPv4 parts = value.split('.') if len(parts) == 4 and all(part.isdigit() and 0 <= int(part) <= 255 for part in parts): return "ip" # Проверка IPv6 ipv6_pattern = re.compile(r'^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$') if ipv6_pattern.match(value): return "ip" return "domain" async def get_whois(identifier, API_KEY: str = None) -> dict: """Получение данных через RDAP для доменов и ipwho.is для IP""" check = await ipcheck(identifier) if check == "ip": url = f"http://ipwho.is/{identifier}" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: resp.raise_for_status() response = await resp.json() return response else: # Основной RDAP сервис url_primary = f"https://rdap.org/domain/{identifier}" # Резервный RDAP сервис url_backup = f"https://rdap.active.domains/domain/{identifier}" # Заголовки для обхода блокировок headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/rdap+json, application/json' } async with aiohttp.ClientSession() as session: try: async with session.get(url_primary, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: if resp.status in (404, 403, 429, 500, 502, 503): # Если ошибка на основном, пробуем резервный async with session.get(url_backup, headers=headers) as resp_backup: resp_backup.raise_for_status() response = await resp_backup.json() return response resp.raise_for_status() response = await resp.json() return response except aiohttp.ClientResponseError as e: # Любая HTTP ошибка - пробуем резервный async with session.get(url_backup, headers=headers) as resp_backup: resp_backup.raise_for_status() response = await resp_backup.json() return response except (aiohttp.ClientError, asyncio.TimeoutError): # При ошибке соединения пробуем резервный async with session.get(url_backup, headers=headers) as resp_backup: resp_backup.raise_for_status() response = await resp_backup.json() return response async def fetch_dns_record(session, domain, record_type): url = "https://dns.google/resolve" headers = {"accept": "application/dns-json"} params = {"name": domain, "type": record_type} async with session.get(url, headers=headers, params=params) as resp: text = await resp.text() try: data = json.loads(text) except json.JSONDecodeError: return [] if not isinstance(data, dict): return [] answers = data.get("Answer") if not answers: return [] return [ ans["data"] for ans in answers if ans.get("type") == (1 if record_type == "A" else 28) ] async def get_ips(domain): async with aiohttp.ClientSession() as session: ipv4_task = fetch_dns_record(session, domain, "A") ipv6_task = fetch_dns_record(session, domain, "AAAA") ipv4, ipv6 = await asyncio.gather(ipv4_task, ipv6_task) return [ipv4, ipv6] async def json2text_ip(data: dict) -> str: """Форматирование данных IP из ipwho.is""" def get(value): return str(value) if value not in (None, '', [], {}) else 'Неизвестно' if not data.get("success", False): return f"❌ IP адрес не найден или недоступен" ip = get(data.get("ip")) ip_type = get(data.get("type")) continent = get(data.get("continent")) country = get(data.get("country")) country_code = get(data.get("country_code")) region = get(data.get("region")) city = get(data.get("city")) postal = get(data.get("postal")) flag = data.get("flag", {}) flag_emoji = flag.get("emoji", "🌍") connection = data.get("connection", {}) asn = get(connection.get("asn")) org = get(connection.get("org")) isp = get(connection.get("isp")) domain = get(connection.get("domain")) timezone_data = data.get("timezone", {}) timezone_id = get(timezone_data.get("id")) timezone_utc = get(timezone_data.get("utc")) lines = [ f"🌎IP адрес: {ip}", f"🖥 Тип: {ip_type}", "", f"{flag_emoji} Местоположение:", f" • Континент: {continent}", f" • Страна: {country} ({country_code})", f" • Регион: {region}", f" • Город: {city}", f" • Индекс: {postal}", "", f"🔗 Подключение:", f" • ASN: {asn}", f" • Организация: {org}", f" • Провайдер: {isp}", f" • Домен: {domain}", "", f"🕐 Часовой пояс:", f" • Зона: {timezone_id}", f" • UTC: {timezone_utc}", ] return '\n'.join(line for line in lines if 'Неизвестно' not in line) async def json2text(data: dict, ips, check) -> str: def get(value): return str(value) if value not in (None, '', [], {}) else 'Неизвестно' # Обработка RDAP формата ldhName = data.get("ldhName", data.get("handle", "")) # Проверка статуса регистрации status = data.get("status", []) if isinstance(status, list): status_str = ', '.join(status) if status else 'Неизвестно' else: status_str = get(status) # Проверка на свободный домен if not ldhName or "object not found" in str(data).lower(): domain_name = data.get("ldhName", data.get("handle", "Неизвестно")) return f"🌎Домен: {domain_name}\n\n🖥 Домен свободен" # Извлечение событий (даты) events = data.get("events", []) created = next((e["eventDate"] for e in events if e.get("eventAction") == "registration"), None) changed = next((e["eventDate"] for e in events if e.get("eventAction") == "last changed"), None) expires = next((e["eventDate"] for e in events if e.get("eventAction") == "expiration"), None) # Извлечение nameservers nameservers_data = data.get("nameservers", []) nameservers = [ns.get("ldhName", "Неизвестно") for ns in nameservers_data] if nameservers_data else ['Неизвестно'] # Извлечение контактов entities = data.get("entities", []) admin = {} registrar = {} for entity in entities: roles = entity.get("roles", []) if "administrative" in roles or "admin" in roles: vcard = entity.get("vcardArray", [[]]) if len(vcard) > 1: for field in vcard[1]: if field[0] == "fn": admin["name"] = field[3] elif field[0] == "email": admin["email"] = field[3] elif field[0] == "org": admin["organization"] = field[3] elif field[0] == "adr": if len(field[3]) > 6: admin["country"] = field[3][6] if "registrar" in roles: vcard = entity.get("vcardArray", [[]]) registrar["name"] = entity.get("handle", "") if len(vcard) > 1: for field in vcard[1]: if field[0] == "fn": registrar["name"] = field[3] elif field[0] == "email": registrar["email"] = field[3] elif field[0] == "tel": registrar["phone"] = field[3] registered = 'Да' if ldhName else 'Нет' lines = [ f"🌎Домен: {ldhName}", ] if len(ips) > 0 and (ips[0] or ips[1]): lines += ["🖥 IP адреса:"] lines += [f" • {ip}" for ip in ips[0]] lines += [f" • {ip}" for ip in ips[1]] lines += [ "", f"🗓 Дата регистрации: {get(created)}", f"♻️ Изменено: {get(changed)}", f"Истекает: {get(expires)}", f"✔️ Зарегистрирован: {registered}", f"📊 Статус: {status_str}", "", ] if check == "domain" and nameservers[0] != 'Неизвестно': lines += ["🖥 DNS-серверы:"] lines += [f" • {ns}" for ns in nameservers] if admin: lines += [ "", "👤 Админ-контакт:", f" • Имя: {get(admin.get('name'))}", f" • Email: {get(admin.get('email'))}", f" • Организация: {get(admin.get('organization'))}", f" • Страна: {get(admin.get('country'))}", "", ] if check == "domain" and registrar: lines += [ "💳 Регистратор:", f" • Название: {get(registrar.get('name'))}", f" • Email: {get(registrar.get('email'))}", f" • Телефон: {get(registrar.get('phone'))}", ] return '\n'.join(line for line in lines if 'Неизвестно' not in line) @loader.tds class WhoisMod(loader.Module): """Модуль для получения информации о домене или ip адресе""" strings = {"name": "Whois"} @loader.command() async def whois(self, message): """<домен> - получить информацию о домене или IP""" domain = ((utils.get_args_raw(message)).split()[0]).encode('idna').decode('ascii') if not domain: await utils.answer(message, "❌ Вы не указали домен!") return await utils.answer(message, "📡 Отправляю запрос...") try: check = await ipcheck(domain) clean = await clean_domain(domain) if check == "ip": info = await get_whois(clean) text = await json2text_ip(info) await utils.answer(message, text) return whois = get_whois(clean) ips = get_ips(clean) info, ips = await asyncio.gather(whois, ips) text = await json2text(info, ips, "domain") await utils.answer(message, text) except Exception as e: if "404" in str(e): await utils.answer(message, f"🌎Домен: {clean}\n\n🖥 Домен свободен") return else: await utils.answer(message, f"❌ Ошибка!\n\n{e}")