"""whois module for hikka userbot
Copyright (C) 2025 Ruslan Isaev
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see https://www.gnu.org/licenses/."""
__version__ = (3, 0, 1)
# meta developer: @RUIS_VlP
# при поддержке @hikka_mods
import json
import aiohttp
from .. import loader, utils
import asyncio
import re
from typing import List
async def clean_domain(value: str) -> str:
# Убираем протокол, порт, путь
value = re.sub(r'^(https?://)?', '', value)
value = value.split('/')[0]
value = value.split(':')[0]
return value
async def ipcheck(value: str) -> str:
# Проверка IPv4
parts = value.split('.')
if len(parts) == 4 and all(part.isdigit() and 0 <= int(part) <= 255 for part in parts):
return "ip"
# Проверка IPv6
ipv6_pattern = re.compile(r'^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$')
if ipv6_pattern.match(value):
return "ip"
return "domain"
async def get_whois(identifier, API_KEY: str = None) -> dict:
"""Получение данных через RDAP для доменов и ipwho.is для IP"""
check = await ipcheck(identifier)
if check == "ip":
url = f"http://ipwho.is/{identifier}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
response = await resp.json()
return response
else:
# Основной RDAP сервис
url_primary = f"https://rdap.org/domain/{identifier}"
# Резервный RDAP сервис
url_backup = f"https://rdap.active.domains/domain/{identifier}"
# Заголовки для обхода блокировок
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/rdap+json, application/json'
}
async with aiohttp.ClientSession() as session:
try:
async with session.get(url_primary, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status in (404, 403, 429, 500, 502, 503):
# Если ошибка на основном, пробуем резервный
async with session.get(url_backup, headers=headers) as resp_backup:
resp_backup.raise_for_status()
response = await resp_backup.json()
return response
resp.raise_for_status()
response = await resp.json()
return response
except aiohttp.ClientResponseError as e:
# Любая HTTP ошибка - пробуем резервный
async with session.get(url_backup, headers=headers) as resp_backup:
resp_backup.raise_for_status()
response = await resp_backup.json()
return response
except (aiohttp.ClientError, asyncio.TimeoutError):
# При ошибке соединения пробуем резервный
async with session.get(url_backup, headers=headers) as resp_backup:
resp_backup.raise_for_status()
response = await resp_backup.json()
return response
async def fetch_dns_record(session, domain, record_type):
url = "https://dns.google/resolve"
headers = {"accept": "application/dns-json"}
params = {"name": domain, "type": record_type}
async with session.get(url, headers=headers, params=params) as resp:
text = await resp.text()
try:
data = json.loads(text)
except json.JSONDecodeError:
return []
if not isinstance(data, dict):
return []
answers = data.get("Answer")
if not answers:
return []
return [
ans["data"]
for ans in answers
if ans.get("type") == (1 if record_type == "A" else 28)
]
async def get_ips(domain):
async with aiohttp.ClientSession() as session:
ipv4_task = fetch_dns_record(session, domain, "A")
ipv6_task = fetch_dns_record(session, domain, "AAAA")
ipv4, ipv6 = await asyncio.gather(ipv4_task, ipv6_task)
return [ipv4, ipv6]
async def json2text_ip(data: dict) -> str:
"""Форматирование данных IP из ipwho.is"""
def get(value):
return str(value) if value not in (None, '', [], {}) else 'Неизвестно'
if not data.get("success", False):
return f"❌ IP адрес не найден или недоступен"
ip = get(data.get("ip"))
ip_type = get(data.get("type"))
continent = get(data.get("continent"))
country = get(data.get("country"))
country_code = get(data.get("country_code"))
region = get(data.get("region"))
city = get(data.get("city"))
postal = get(data.get("postal"))
flag = data.get("flag", {})
flag_emoji = flag.get("emoji", "🌍")
connection = data.get("connection", {})
asn = get(connection.get("asn"))
org = get(connection.get("org"))
isp = get(connection.get("isp"))
domain = get(connection.get("domain"))
timezone_data = data.get("timezone", {})
timezone_id = get(timezone_data.get("id"))
timezone_utc = get(timezone_data.get("utc"))
lines = [
f"🌎IP адрес: {ip}",
f"🖥 Тип: {ip_type}",
"",
f"{flag_emoji} Местоположение:",
f" • Континент: {continent}",
f" • Страна: {country} ({country_code})",
f" • Регион: {region}",
f" • Город: {city}",
f" • Индекс: {postal}",
"",
f"🔗 Подключение:",
f" • ASN: {asn}",
f" • Организация: {org}",
f" • Провайдер: {isp}",
f" • Домен: {domain}",
"",
f"🕐 Часовой пояс:",
f" • Зона: {timezone_id}",
f" • UTC: {timezone_utc}",
]
return '\n'.join(line for line in lines if 'Неизвестно' not in line)
async def json2text(data: dict, ips, check) -> str:
def get(value):
return str(value) if value not in (None, '', [], {}) else 'Неизвестно'
# Обработка RDAP формата
ldhName = data.get("ldhName", data.get("handle", ""))
# Проверка статуса регистрации
status = data.get("status", [])
if isinstance(status, list):
status_str = ', '.join(status) if status else 'Неизвестно'
else:
status_str = get(status)
# Проверка на свободный домен
if not ldhName or "object not found" in str(data).lower():
domain_name = data.get("ldhName", data.get("handle", "Неизвестно"))
return f"🌎Домен: {domain_name}\n\n🖥 Домен свободен"
# Извлечение событий (даты)
events = data.get("events", [])
created = next((e["eventDate"] for e in events if e.get("eventAction") == "registration"), None)
changed = next((e["eventDate"] for e in events if e.get("eventAction") == "last changed"), None)
expires = next((e["eventDate"] for e in events if e.get("eventAction") == "expiration"), None)
# Извлечение nameservers
nameservers_data = data.get("nameservers", [])
nameservers = [ns.get("ldhName", "Неизвестно") for ns in nameservers_data] if nameservers_data else ['Неизвестно']
# Извлечение контактов
entities = data.get("entities", [])
admin = {}
registrar = {}
for entity in entities:
roles = entity.get("roles", [])
if "administrative" in roles or "admin" in roles:
vcard = entity.get("vcardArray", [[]])
if len(vcard) > 1:
for field in vcard[1]:
if field[0] == "fn":
admin["name"] = field[3]
elif field[0] == "email":
admin["email"] = field[3]
elif field[0] == "org":
admin["organization"] = field[3]
elif field[0] == "adr":
if len(field[3]) > 6:
admin["country"] = field[3][6]
if "registrar" in roles:
vcard = entity.get("vcardArray", [[]])
registrar["name"] = entity.get("handle", "")
if len(vcard) > 1:
for field in vcard[1]:
if field[0] == "fn":
registrar["name"] = field[3]
elif field[0] == "email":
registrar["email"] = field[3]
elif field[0] == "tel":
registrar["phone"] = field[3]
registered = 'Да' if ldhName else 'Нет'
lines = [
f"🌎Домен: {ldhName}",
]
if len(ips) > 0 and (ips[0] or ips[1]):
lines += ["🖥 IP адреса:"]
lines += [f" • {ip}" for ip in ips[0]]
lines += [f" • {ip}" for ip in ips[1]]
lines += [
"",
f"🗓 Дата регистрации: {get(created)}",
f"♻️ Изменено: {get(changed)}",
f"⏳Истекает: {get(expires)}",
f"✔️ Зарегистрирован: {registered}",
f"📊 Статус: {status_str}",
"",
]
if check == "domain" and nameservers[0] != 'Неизвестно':
lines += ["🖥 DNS-серверы:"]
lines += [f" • {ns}" for ns in nameservers]
if admin:
lines += [
"",
"👤 Админ-контакт:",
f" • Имя: {get(admin.get('name'))}",
f" • Email: {get(admin.get('email'))}",
f" • Организация: {get(admin.get('organization'))}",
f" • Страна: {get(admin.get('country'))}",
"",
]
if check == "domain" and registrar:
lines += [
"💳 Регистратор:",
f" • Название: {get(registrar.get('name'))}",
f" • Email: {get(registrar.get('email'))}",
f" • Телефон: {get(registrar.get('phone'))}",
]
return '\n'.join(line for line in lines if 'Неизвестно' not in line)
@loader.tds
class WhoisMod(loader.Module):
"""Модуль для получения информации о домене или ip адресе"""
strings = {"name": "Whois"}
@loader.command()
async def whois(self, message):
"""<домен> - получить информацию о домене или IP"""
domain = ((utils.get_args_raw(message)).split()[0]).encode('idna').decode('ascii')
if not domain:
await utils.answer(message, "❌ Вы не указали домен!")
return
await utils.answer(message, "📡 Отправляю запрос...")
try:
check = await ipcheck(domain)
clean = await clean_domain(domain)
if check == "ip":
info = await get_whois(clean)
text = await json2text_ip(info)
await utils.answer(message, text)
return
whois = get_whois(clean)
ips = get_ips(clean)
info, ips = await asyncio.gather(whois, ips)
text = await json2text(info, ips, "domain")
await utils.answer(message, text)
except Exception as e:
if "404" in str(e):
await utils.answer(message, f"🌎Домен: {clean}\n\n🖥 Домен свободен")
return
else:
await utils.answer(message, f"❌ Ошибка!\n\n{e}")