# scope: hikka_min 1.2.10 __version__ = (2, 0, 0) # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # meta pic: https://i.imgur.com/MTkqvXX.jpeg # meta banner: https://mods.hikariatama.ru/badges/shikimori.jpg # scope: inline # scope: hikka_only # meta developer: @hikarimods import logging import time from urllib.parse import quote_plus import requests from telethon.tl.types import Message from .. import loader, utils from ..inline.types import InlineCall, InlineQuery logger = logging.getLogger(__name__) @loader.tds class ShikimoriMod(loader.Module): """Shikimori API Wrapper""" strings = { "name": "Shikimori", "authorize": "🔓 Authorize", "code": "✍️ Code", "code_input": "✍️ Redirect url after auth", "auth": ( '🔓 Shikimori authorization:\n\n1. Click "🔓 Authorize"\n2. Click' ' "Allow"\n3. Copy redirect url, and enter it in "✍️ Code"' ), "my_animes": ( "🐙 My humble anime list:\n\n{}' ), "no_args": "🚫 No arguments specified", "added": "❤️ Anime {} added to planned", "auth_successful": "👍 Authorized! Check module help for new commands", "planned": "🕐 Planned", "watching": "🎬 Watching", "rewatching": "🔄 Re-watching", "completed": "✅ Completed", "on_hold": "🗓 Holded", "dropped": "🚫 Dropped", "interact": ( '📼 Interacting with {}' ), "state_changed": "Anime state changed to {}", "delete": "🗑 Delete", "no_status": "🔘 Change status", "error": "🚫 Error", "success": "✅ Success", } strings_ru = { "authorize": "🔓 Авторизоваться", "code": "✍️ Код", "code_input": "✍️ Ссылка, на которую тебя перебросило после авторизации", "auth": ( '🔓 Авторизация на Shikimori:\n\n1. Нажми "🔓 Авторизоваться"\n2.' ' Нажми "Разрешить"\n3. Скопируй ссылку, на которую тебя перекинет, и введи' ' ее в "✍️ Код"' ), "my_animes": ( "🐙 Мой скромный список' " аниме:\n\n{}" ), "no_args": "🚫 Аргументы не указаны", "added": "❤️ Аниме {} добавлено в отложенные", "auth_successful": ( "👍 Авторизован! Смотри помощь модуля, там новые команды" ), "planned": "🕐 Запланировано", "watching": "🎬 Смотрю", "rewatching": "🔄 Пересматриваю", "completed": "✅ Просмотрено", "on_hold": "🗓 Отложено", "dropped": "🚫 Заброшено", "interact": ( '📼 Взаимодействие с {}' ), "state_changed": "Состояние аниме изменено на {}", "delete": "🗑 Удалить", "no_status": "🔘 Изменить статус", "error": "🚫 Ошибка", "success": "✅ Успешно", } async def client_ready(self, client, db): self._shiki_me = None # will be set later self._rates_cache = {} async def _search( self, query: str, limit: int = 10, no_retry: bool = False, ) -> dict: result = await utils.run_sync( requests.get, f"https://shikimori.one/api/animes?search={quote_plus(query)}&limit={limit}", headers={ "User-Agent": "Hikka", "Authorization": f"Bearer {self.get('token')}", }, ) if result.status_code == 401: if no_retry: logger.error("Can't refresh token") return {} await self._refresh_token() return await self._search(query, limit, no_retry=True) result = result.json() rates = { i["anime"]["id"]: i for i in await self._get_rates() if "anime" in i and "id" in i["anime"] } for i, item in enumerate(result): if item["id"] in rates: result[i]["status"] = rates[item["id"]]["status"] result[i]["episodes_seen"] = rates[item["id"]]["episodes"] else: if "status" in result[i]: del result[i]["status"] return result def _get_anime_message(self, anime: dict) -> str: return ( f'🐱 {utils.escape_html(anime["russian"])}\n' f'🌍 URL: https://shikimori.one{anime["url"]}\n' f'🧮 Type: {anime["kind"]}\n' f'📺 Episodes: {anime.get("episodes_seen", 0)}/{anime["episodes"]}\n' f'📅 Released: {anime["released_on"]}' ) def _get_anime_markup(self, anime: dict) -> str: return [ [ { "text": self.strings(anime.get("status", "no_status")), "callback": self._anime_interact, "args": (anime,), } ], ] + ( [ [ { "text": "➖ Episode", "callback": self._change_episodes_quantity, "args": (anime, -1), }, { "text": "➕ Episode", "callback": self._change_episodes_quantity, "args": (anime, 1), }, ] ] if anime.get("status", "no_status") not in {"completed", "no_status"} else [] ) async def anime_inline_handler(self, query: InlineQuery): """ - Search Shikimori""" if not query.args: return await query.e400() result = [] for anime in await self._search(query.args): result += [ { "title": anime["name"], "thumb": f'https://shikimori.one/{anime["image"]["preview"]}', "message": self._get_anime_message(anime), "reply_markup": self._get_anime_markup(anime), } ] return result async def shikicmd(self, message: Message): """ - Search anime and return best match as form""" args = utils.get_args_raw(message) if not args: await utils.answer(message, self.strings("no_args")) return anime = (await self._search(args, limit=1))[0] await self.inline.form( message=message, text=self._get_anime_message(anime), reply_markup=self._get_anime_markup(anime), photo=f'https://shikimori.one/{anime["image"]["original"]}', ) async def _change_episodes_quantity( self, call: InlineCall, anime: dict, diff: int, no_retry: bool = False, ): if not self._shiki_me: self._shiki_me = await self._get_me() self._rates_cache = {} await self._get_rates() rate = None for i, local_rate in enumerate(self._rates_cache["rates"]): if local_rate["anime"]["id"] == anime["id"]: rate = local_rate self._rates_cache["rates"][i]["episodes"] += diff if not rate: logger.error("Can't find rate by anime id") return False payload = { "user_rate[chapters]": rate.get("chapters", 0), "user_rate[episodes]": rate.get("episodes", 0) + diff, "user_rate[rewatches]": rate.get("rewatches", 0), "user_rate[volumes]": rate.get("volumes", 0), "user_rate[score]": rate.get("score", 0), "user_rate[status]": rate.get("status", None), "user_rate[text]": rate.get("text", None), } result = await utils.run_sync( requests.put, f"https://shikimori.one/api/v2/user_rates/{rate['id']}", headers={ "User-Agent": "Hikka", "Authorization": f"Bearer {self.get('token')}", }, data=payload, ) if result.status_code == 401: if no_retry: logger.error("Can't refresh token") return {} await self._refresh_token() return await self._change_episodes_quantity( call, anime, diff, no_retry=True ) if not str(result.status_code).startswith("2"): logger.error(result.text) await call.answer(self.strings("error")) return if "episodes_seen" in anime: anime["episodes_seen"] += diff else: anime["episodes_seen"] = 0 if diff < 0 else 1 await call.answer(self.strings("success")) await call.edit(self._get_anime_message(anime), self._get_anime_markup(anime)) async def _anime_interact( self, call: InlineCall, anime: dict, ): await call.edit( self.strings("interact").format( anime["url"], utils.escape_html(anime["russian"]), ), reply_markup=utils.chunks( [ { "text": self.strings(status), "callback": self._change_anime_state, "args": (anime, status), } for status in { "planned", "watching", "rewatching", "completed", "on_hold", "dropped", } ], 2, ) + [ [ { "text": self.strings("delete"), "callback": self._delete_anime_rate, "args": (anime,), } ] ], ) async def _change_anime_state(self, call: InlineCall, anime: dict, state: str): await self._change_anime_state_api(anime["id"], state) # We can do this locally to prevent API Flood for i, item in enumerate(self._rates_cache["rates"]): if item["anime"]["id"] == anime["id"]: self._rates_cache["rates"][i]["status"] = state anime["status"] = state await call.answer(self.strings("state_changed").format(self.strings(state))) await call.edit(self._get_anime_message(anime), self._get_anime_markup(anime)) async def _delete_anime_rate(self, call: InlineCall, anime: dict): await self._delete_anime_rate_api(anime["id"]) # We can do this locally to prevent API Flood for i, item in enumerate(self._rates_cache["rates"]): if item["anime"]["id"] == anime["id"]: del self._rates_cache["rates"][i] del anime["status"] await call.answer(self.strings("state_changed").format(self.strings("delete"))) await call.edit(self._get_anime_message(anime), self._get_anime_markup(anime)) async def shikiauthcmd(self, message: Message): """Authorize on Shikimori.one""" await self.inline.form( message=message, text=self.strings("auth"), reply_markup=[ { "text": self.strings("authorize"), "url": r"https://shikimori.one/oauth/authorize?client_id=-wQ_BBnF3GOvhRi6Z6pS60sYzY5ge7Y92aBtCEYSbgc&redirect_uri=https%3A%2F%2Fmods.hikariatama.ru&response_type=code&scope=user_rates", }, { "text": self.strings("code"), "input": self.strings("code_input"), "handler": self._proceed_auth, }, ], ) async def _request_token(self, code: str): result = ( await utils.run_sync( requests.post, "https://shikimori.one/oauth/token", headers={"User-Agent": "Hikka"}, data={ "grant_type": "authorization_code", "client_id": "-wQ_BBnF3GOvhRi6Z6pS60sYzY5ge7Y92aBtCEYSbgc", "client_secret": "mRESsAiJxzuOPMCkrgRvbdnMfLycZAAt_YDgD4hMDyA", "code": code, "redirect_uri": "https://mods.hikariatama.ru", }, ) ).json() self.set("token", result["access_token"]) self.set("refresh_token", result["refresh_token"]) async def _refresh_token(self): result = ( await utils.run_sync( requests.post, "https://shikimori.one/oauth/token", headers={"User-Agent": "Hikka"}, data={ "grant_type": "refresh_token", "client_id": "-wQ_BBnF3GOvhRi6Z6pS60sYzY5ge7Y92aBtCEYSbgc", "client_secret": "mRESsAiJxzuOPMCkrgRvbdnMfLycZAAt_YDgD4hMDyA", "refresh_token": self.get("refresh_token"), }, ) ).json() self.set("token", result["access_token"]) self.set("refresh_token", result["refresh_token"]) async def _change_anime_state_api( self, uid: int, state: str, kind: str = "Anime", no_retry: bool = False, ) -> bool: if not self._shiki_me: self._shiki_me = await self._get_me() payload = { "user_rate[status]": state, "user_rate[target_id]": uid, "user_rate[target_type]": kind, "user_rate[user_id]": self._shiki_me["id"], "user_rate[score]": 0, } result = await utils.run_sync( requests.post, "https://shikimori.one/api/v2/user_rates", headers={ "User-Agent": "Hikka", "Authorization": f"Bearer {self.get('token')}", }, data=payload, ) if result.status_code == 401: if no_retry: logger.error("Can't refresh token") return {} await self._refresh_token() return await self._change_anime_state_api(uid, state, kind, no_retry=True) if not str(result.status_code).startswith("2"): logger.error(result.text) return str(result.status_code).startswith("2") async def _delete_anime_rate_api( self, uid: int, no_retry: bool = False, ) -> bool: if not self._shiki_me: self._shiki_me = await self._get_me() self._rates_cache = {} await self._get_rates() rate_id = None for rate in self._rates_cache["rates"]: if rate["anime"]["id"] == uid: rate_id = rate["id"] if not rate_id: logger.error("Can't find rate by anime id") return False result = await utils.run_sync( requests.delete, f"https://shikimori.one/api/v2/user_rates/{rate_id}", headers={ "User-Agent": "Hikka", "Authorization": f"Bearer {self.get('token')}", }, ) if result.status_code == 401: if no_retry: logger.error("Can't refresh token") return {} await self._refresh_token() return await self._delete_anime_rate_api(uid, no_retry=True) if not str(result.status_code).startswith("2"): logger.error(result.text) logger.error(result.status_code) return str(result.status_code).startswith("2") async def _get_me(self, no_retry: bool = False) -> dict: result = await utils.run_sync( requests.get, "https://shikimori.one/api/users/whoami", headers={ "User-Agent": "Hikka", "Authorization": f"Bearer {self.get('token')}", }, ) if result.status_code == 401: if no_retry: logger.error("Can't refresh token") return {} await self._refresh_token() return await self._get_me(no_retry=True) return result.json() async def myshikicmd(self, message: Message): """Show watched animes from Shikimori.one""" rates = await self._get_rates() # do this early so the self._shiki_me is set await utils.answer( message, self.strings("my_animes").format( self._shiki_me["nickname"], "\n".join( [ f"▫️" f" {utils.escape_html(rate['anime'].get('russian', rate['anime']['name']))}" for rate in rates if rate["status"] == "completed" ] ), ), ) async def aniaddcmd(self, message: Message): """ - Add best search match to the list of planned animes""" args = utils.get_args_raw(message) if not args: await utils.answer(message, self.strings("no_args")) return anime = (await self._search(args, 1))[0] await self._change_anime_state_api(anime["id"], "planned") await utils.answer( message, self.strings("added").format(utils.escape_html(anime["russian"])), ) async def _get_rates(self, no_retry: bool = False) -> list: if self._rates_cache and self._rates_cache["timeout"] > time.time(): return self._rates_cache["rates"] if not self._shiki_me: self._shiki_me = await self._get_me() result = await utils.run_sync( requests.get, f"https://shikimori.one/api/users/{self._shiki_me['id']}/anime_rates?limit=5000", headers={ "User-Agent": "Hikka", "Authorization": f"Bearer {self.get('token')}", }, ) if result.status_code == 401: if no_retry: logger.error("Can't refresh token") return {} await self._refresh_token() return await self._get_rates(no_retry=True) self._rates_cache = {"timeout": time.time() + 5 * 60, "rates": result.json()} return result.json() async def _proceed_auth(self, call: InlineCall, query: str): try: code = query.split("?code=")[1] except Exception: return await self._request_token(code) await call.edit(self.strings("auth_successful"))