# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://static.dan.tatar/forex_wss.png
# meta banner: https://mods.hikariatama.ru/badges/forex_wss.jpg
# meta developer: @hikarimods
# requires: websockets
# scope: inline
# scope: hikka_only
# scope: hikka_min 1.2.10
import asyncio
import datetime
import json
import time
from urllib.parse import quote_plus
import requests
import websockets
from aiogram.utils.exceptions import MessageNotModified
from telethon.tl.types import Message
from .. import loader, utils
from ..inline.types import InlineCall
@loader.tds
class RealTimeValutesMod(loader.Module):
"""Track valutes in real time. Updates more than once a second"""
strings = {
"name": "RealTimeValutes",
"loading": "😌 Loading the most actual info from Forex...",
"wss_error": "🚫 Socket connection error",
"exchanges": (
"😌 Exchange rates by Forex\n\n💵 1 USD = {:.2f} RUB\n💶 1 EUR ="
" {:.2f} RUB\n\nThis info is relevant to {:%m/%d/%Y"
" %H:%M:%S}"
),
}
strings_ru = {
"loading": "😌 Загружаю информацию с Forex...",
"wss_error": "🚫 Ошибка подеключения к сокету",
"exchanges": (
"😌 Курсы валют Forex\n\n💵 1 USD = {:.2f} RUB\n💶 1 EUR = {:.2f}"
" RUB\n\nИнформация актуальна на {:%m/%d/%Y %H:%M:%S}"
),
"_cmd_doc_val": "Показать курсы валют",
"_cls_doc": (
"Отслеживает курсы валют в режиме реального времени. Обновляется несколько"
" раз в секунду"
),
}
async def _connect(self):
r = await utils.run_sync(
requests.get,
(
f"https://rates-live.efxnow.com/signalr/negotiate?clientProtocol=2.1&connectionData=%5B%7B%22name%22%3A%22ratesstreamer%22%7D%5D&_={time.time() * 1000:.0f}"
),
)
token = quote_plus(r.json()["ConnectionToken"])
base = f"wss://rates-live.efxnow.com/signalr/connect?transport=webSockets&clientProtocol=2.1&connectionToken={token}&connectionData=%5B%7B%22name%22%3A%22ratesstreamer%22%7D%5D&tid=8"
async with websockets.connect(base) as wss:
await wss.send(
'{"H":"ratesstreamer","M":"SubscribeToPricesUpdates","A":[["401203106","401203109"]],"I":8}'
) # USD/RUB | EUR/RUB
self._restart_at = time.time() + 5 * 60
while time.time() < self._restart_at:
rates = json.loads(await wss.recv())
if "M" not in rates or not rates["M"]:
continue
for row in rates["M"]:
if "A" not in row:
continue
rate = row["A"]
valute = rate[0].split("|")[1].split("/")[0]
rate = float(rate[0].split("|")[3])
self._rates[valute] = rate
self._upd_time = time.time()
return await self._connect()
async def client_ready(self, client, db):
self._rates = {}
self._upd_time = 0
self._ratelimit = 0
self._reload_markup = self.inline.generate_markup(
{"text": "🔄 Update", "data": "update_exchanges"}
)
self._task = asyncio.ensure_future(self._connect())
async def valcmd(self, message: Message):
"""Show exchange rates"""
try:
m = self.strings("exchanges").format(
self._rates["USD"],
self._rates["EUR"],
getattr(datetime, "datetime", datetime).fromtimestamp(self._upd_time),
)
except (KeyError, IndexError):
await utils.answer(message, self.strings("wss_error"))
return
try:
await self.inline.form(
m,
message=message,
reply_markup={"text": "🔄 Update", "data": "update_exchanges"},
disable_security=True,
silent=True,
)
except Exception:
await utils.answer(message, m)
@loader.inline_everyone
async def reload_callback_handler(self, call: InlineCall):
"""Processes 'reload' button clicks"""
if call.data != "update_exchanges":
return
if self._ratelimit and time.time() < self._ratelimit:
await call.answer("Do not spam this button")
return
self._ratelimit = time.time() + 1
try:
await self.inline.bot.edit_message_text(
inline_message_id=call.inline_message_id,
text=self.strings("exchanges").format(
self._rates["USD"],
self._rates["EUR"],
getattr(datetime, "datetime", datetime).fromtimestamp(
self._upd_time
),
),
reply_markup=self._reload_markup,
parse_mode="HTML",
)
await call.answer("😌 Exchange rates update complete!", show_alert=True)
except (IndexError, KeyError):
await call.answer("Socket connection error", show_alert=True)
return
except MessageNotModified:
await call.answer(
"Exchange rates have not changes since last update", show_alert=True
)
return
async def on_unload(self):
self._task.cancel()