"""whois module for hikka userbot
Copyright (C) 2025 Ruslan Isaev
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see https://www.gnu.org/licenses/."""
__version__ = (2, 0, 0)
# meta developer: @RUIS_VlP
# при поддержке @hikka_mods
import json
import aiohttp
from .. import loader, utils
import asyncio
import re
from typing import List
async def clean_domain(value: str) -> List[str]:
# Убираем протокол, порт, путь
value = re.sub(r'^(https?://)?', '', value)
value = value.split('/')[0]
value = value.split(':')[0]
return value
async def ipcheck(value: str) -> str:
# Проверка IPv4
parts = value.split('.')
if len(parts) == 4 and all(part.isdigit() and 0 <= int(part) <= 255 for part in parts):
return "ip"
# Проверка IPv6
ipv6_pattern = re.compile(r'^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$')
if ipv6_pattern.match(value):
return "ip"
return "domain"
async def get_whois(identifier, API_KEY: str) -> dict:
url = "https://api.jsonwhoisapi.com/v1/whois"
headers = {
"Authorization": API_KEY
}
params = {
"identifier": identifier
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params) as resp:
resp.raise_for_status()
response = await resp.json()
return response
async def fetch_dns_record(session, domain, record_type):
url = "https://dns.google/resolve"
headers = {"accept": "application/dns-json"}
params = {"name": domain, "type": record_type}
async with session.get(url, headers=headers, params=params) as resp:
text = await resp.text()
try:
data = json.loads(text)
except json.JSONDecodeError:
return []
if not isinstance(data, dict):
return []
answers = data.get("Answer")
if not answers:
return []
return [
ans["data"]
for ans in answers
if ans.get("type") == (1 if record_type == "A" else 28)
]
async def get_ips(domain):
async with aiohttp.ClientSession() as session:
ipv4_task = fetch_dns_record(session, domain, "A")
ipv6_task = fetch_dns_record(session, domain, "AAAA")
ipv4, ipv6 = await asyncio.gather(ipv4_task, ipv6_task)
return [ipv4, ipv6]
async def json2text(data: dict, ips, check) -> str:
def get(value):
return str(value) if value not in (None, '', [], {}) else 'Неизвестно'
status = data.get("status", [])
status_str = ', '.join(status) if isinstance(status, list) else get(status)
nameservers = data.get("nameservers", []) or ['Неизвестнo']
registered = 'Да' if data.get('registered') else 'Нет'
if registered == "Нет":
return f"🌎Домен: {(get(data.get('name'))).encode('ascii').decode('idna')}\n\n🖥 Домен свободен"
admin = (data.get("contacts", {}).get("admin") or [{}])[0]
registrar = data.get("registrar", {})
lines = [
f"🌎Домен: {(get(data.get('name'))).encode('ascii').decode('idna')}",]
if len(ips) > 0:
lines += ["🖥 IP адреса:"]
lines += [f" • {ip}" for ip in ips[0]]
lines += [f" • {ip}" for ip in ips[1]]
else:
pass
lines += [
"",
f"🗓 Дата регистрации: {get(data.get('created'))}",
f"♻️ Изменено: {get(data.get('changed'))}",
f"⏳Истекает: {get(data.get('expires'))}",
f"✔️ Зарегистрирован: {registered}",
f"📊 Статус: {status_str}",
"",]
if check == "domain":
lines += [
f"🖥 DNS-серверы:",
]
lines += [f" • {ns}" for ns in nameservers]
lines += [
"",
"👤 Админ-контакт:",
f" • Имя: {get(admin.get('name'))}",
f" • Email: {get(admin.get('email'))}",
f" • Организация: {get(admin.get('organization'))}",
f" • Страна: {get(admin.get('country'))}",
"",]
if check == "domain":
lines += [
"💳 Регистратор:",
f" • ID: {get(registrar.get('id'))}",
f" • Название: {get(registrar.get('name'))}",
f" • Email: {get(registrar.get('email'))}",
f" • Сайт: {get(registrar.get('url'))}",
f" • Телефон: {get(registrar.get('phone'))}",
]
return '\n'.join(line for line in lines if 'Неизвестно' not in line)
@loader.tds
class WhoisMod(loader.Module):
"""Модуль для получения информации о домене или ip адресе"""
strings = {"name": "Whois"}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"api_key",
"None",
lambda: "API ключ с сайта https://jsonwhoisapi.com/",
validator=loader.validators.String(),
),
)
@loader.command()
async def whois(self, message):
"""<домен> - получить информацию о домене или IP"""
api_key = self.config["api_key"]
if api_key == "None":
await utils.answer(message, '❌ Не указан API ключ! Получите его на jsonwhoisapi.com и вставьте в config (.config Whois)')
return
domain = ((utils.get_args_raw(message)).split()[0]).encode('idna').decode('ascii')
if not domain:
await utils.answer(message, "❌ Вы не указали домен!")
return
try:
check = await ipcheck(domain)
clean = await clean_domain(domain)
if check == "ip":
info = await get_whois(clean, api_key)
text = await json2text(info, [], "ip")
await utils.answer(message, text)
return
whois = get_whois(clean, api_key)
ips = get_ips(clean)
info, ips = await asyncio.gather(whois, ips)
text = await json2text(info, ips, "domain")
await utils.answer(message, text)
except Exception as e:
await utils.answer(message, f"❌ Ошибка!\n\n{e}")