mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-16 14:34:17 +02:00
fix: banners now will be visible no matter what
This commit is contained in:
92
Limoka.py
92
Limoka.py
@@ -37,7 +37,7 @@ from .. import utils, loader
|
|||||||
from ..types import BotInlineCall, InlineCall
|
from ..types import BotInlineCall, InlineCall
|
||||||
|
|
||||||
logger = logging.getLogger("Limoka")
|
logger = logging.getLogger("Limoka")
|
||||||
__version__ = (1, 5, 4)
|
__version__ = (1, 5, 5)
|
||||||
|
|
||||||
|
|
||||||
def _parse_version_from_source(source: str):
|
def _parse_version_from_source(source: str):
|
||||||
@@ -846,29 +846,83 @@ class Limoka(loader.Module):
|
|||||||
logger.error(f"Skipping unsafe rmtree for {folder}")
|
logger.error(f"Skipping unsafe rmtree for {folder}")
|
||||||
|
|
||||||
async def _validate_url(self, url: str) -> Optional[str]:
|
async def _validate_url(self, url: str) -> Optional[str]:
|
||||||
if not url or url in self._invalid_banners:
|
logger.debug(f"_validate_url called with: {url}")
|
||||||
|
if not url:
|
||||||
|
logger.warning("_validate_url: URL is empty, returning None")
|
||||||
return None
|
return None
|
||||||
|
if url in self._invalid_banners:
|
||||||
|
logger.debug(f"_validate_url: URL already in invalid_banners: {url}, returning None")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Headers to mimic a browser request
|
||||||
|
headers = {
|
||||||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||||||
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
logger.debug(f"_validate_url: Starting validation for {url}")
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.head(
|
ct = None
|
||||||
url, timeout=5, allow_redirects=True
|
response_status = None
|
||||||
) as response:
|
|
||||||
if response.status != 200:
|
# Try HEAD first (more efficient)
|
||||||
self._invalid_banners.add(url)
|
try:
|
||||||
return None
|
logger.debug(f"_validate_url: Attempting HEAD request for {url}")
|
||||||
ct = response.headers.get("Content-Type", "").lower()
|
async with session.head(
|
||||||
mime = None
|
url, timeout=5, allow_redirects=True, headers=headers
|
||||||
if ct.startswith("image/"):
|
) as response:
|
||||||
return url
|
response_status = response.status
|
||||||
if not ct: # Some servers don't respond to HEAD requests with Content-Type, so instead we will try guess mime from content
|
logger.debug(f"_validate_url: HEAD request returned status {response.status} for {url}")
|
||||||
async with session.get(url, timeout=5) as get_response:
|
if response.status == 200:
|
||||||
data = await get_response.read(2048)
|
ct = response.headers.get("Content-Type", "").lower()
|
||||||
mime = filetype.guess_mime(data, mime=True)
|
logger.debug(f"_validate_url: Content-Type from HEAD: '{ct}' for {url}")
|
||||||
if mime and mime.startswith("image/"):
|
except (aiohttp.ClientError, asyncio.TimeoutError) as head_error:
|
||||||
return url
|
logger.debug(f"_validate_url: HEAD failed ({type(head_error).__name__}), will try GET for {url}")
|
||||||
|
|
||||||
|
# If HEAD didn't work or returned non-200, try GET
|
||||||
|
if ct is None:
|
||||||
|
max_retries = 2
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
async with session.get(
|
||||||
|
url, timeout=10, headers=headers, allow_redirects=True
|
||||||
|
) as response:
|
||||||
|
if response.status != 200:
|
||||||
|
self._invalid_banners.add(url)
|
||||||
|
return None
|
||||||
|
ct = response.headers.get("Content-Type", "").lower()
|
||||||
|
|
||||||
|
# Try to get MIME if Content-Type is missing
|
||||||
|
if not ct:
|
||||||
|
try:
|
||||||
|
data = await response.content.read(2048)
|
||||||
|
mime = filetype.guess_mime(data, mime=True)
|
||||||
|
if mime and mime.startswith("image/"):
|
||||||
|
return url
|
||||||
|
else:
|
||||||
|
self._invalid_banners.add(url)
|
||||||
|
return None
|
||||||
|
except Exception as mime_error:
|
||||||
|
logger.error(f"_validate_url: Error reading content for MIME detection: {mime_error}")
|
||||||
|
break # Success, exit retry loop
|
||||||
|
except (aiohttp.ClientError, asyncio.TimeoutError) as get_error:
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
await asyncio.sleep(1) # Wait before retry
|
||||||
|
else:
|
||||||
|
self._invalid_banners.add(url)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Check Content-Type from successful request
|
||||||
|
if ct and ct.startswith("image/"):
|
||||||
|
return url
|
||||||
|
elif ct:
|
||||||
self._invalid_banners.add(url)
|
self._invalid_banners.add(url)
|
||||||
return None
|
return None
|
||||||
except Exception:
|
else:
|
||||||
|
self._invalid_banners.add(url)
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
if url:
|
if url:
|
||||||
self._invalid_banners.add(url)
|
self._invalid_banners.add(url)
|
||||||
return None
|
return None
|
||||||
|
|||||||
Reference in New Issue
Block a user