import asyncio
import aiohttp
import argparse
import subprocess
import os
import tempfile
from pathlib import Path
import parse
parser = argparse.ArgumentParser(description="Update Diffs Script")
parser.add_argument(
"--token",
type=str,
required=True,
help="Token of Telegram bot",
)
parser.add_argument(
"--api_url",
type=str,
default="https://api.telegram.org",
help="API URL of Telegram API",
)
parser.add_argument(
"--chat_id",
type=str,
required=True,
help="Chat ID to send updates to",
)
parser.add_argument(
"--base_commit",
type=str,
default="HEAD~1",
help="Base commit to compare against",
)
arguments = parser.parse_args()
async def send_message(session, text):
"""Send a text message to the channel"""
url = f"{arguments.api_url}/bot{arguments.token}/sendMessage"
data = {
'chat_id': arguments.chat_id,
'text': text,
'parse_mode': 'Markdown',
}
async with session.post(url, data=data) as response:
return await response.json()
async def send_document(session, file_path, caption=None):
"""Send a document to the channel"""
url = f"{arguments.api_url}/bot{arguments.token}/sendDocument"
with open(file_path, 'rb') as f:
data = aiohttp.FormData()
data.add_field('chat_id', arguments.chat_id)
data.add_field('document', f, filename=os.path.basename(file_path))
data.add_field('parse_mode', 'HTML')
if caption:
data.add_field('caption', caption)
data.add_field('parse_mode', 'Markdown')
async with session.post(url, data=data) as response:
return await response.json()
def get_changed_files(base_commit):
"""Get list of changed files between commits"""
try:
result = subprocess.check_output(
['git', 'diff', '--name-only', base_commit, 'HEAD'],
cwd=os.getcwd()
).decode().strip().split('\n')
return [f for f in result if f]
except subprocess.CalledProcessError:
return []
def get_deleted_files(base_commit):
"""Get list of deleted files between commits"""
try:
result = subprocess.check_output(
['git', 'diff', '--diff-filter=D', '--name-only', base_commit, 'HEAD'],
cwd=os.getcwd()
).decode().strip().split('\n')
return [f for f in result if f]
except subprocess.CalledProcessError:
return []
def get_diff_files(base_commit, diff_filter):
"""Get list of files for a specific git diff filter"""
try:
result = subprocess.check_output(
['git', 'diff', f'--diff-filter={diff_filter}', '--name-only', base_commit, 'HEAD'],
cwd=os.getcwd()
).decode().strip().splitlines()
return [f for f in result if f]
except subprocess.CalledProcessError:
return []
def get_added_files(base_commit):
return get_diff_files(base_commit, 'A')
def get_modified_files(base_commit):
return get_diff_files(base_commit, 'M')
def get_file_diff(file_path, base_commit):
"""Get diff for a specific file"""
try:
diff = subprocess.check_output(
['git', 'diff', base_commit, 'HEAD', '--', file_path],
cwd=os.getcwd()
).decode()
return diff
except subprocess.CalledProcessError:
return ""
def get_module_developer(file_path):
"""Read module metadata and return the developer handle"""
try:
module_info = parse.get_module_info(file_path)
except Exception:
return None
if not module_info:
return None
developer = module_info.get('meta', {}).get('developer')
if developer:
return developer.strip()
return None
def is_module_file(file_path):
"""Check if file is a Python module in a modules directory"""
# Check if it's a .py file and in a modules-like directory
dir_depth = len(Path(file_path).parts)
return file_path.endswith('.py') and (
dir_depth := len(Path(file_path).parts) >= 2
)
def extract_module_name(file_path):
"""Extract module name from file path"""
return Path(file_path).stem
async def main():
added_files = get_added_files(arguments.base_commit)
modified_files = get_modified_files(arguments.base_commit)
deleted_files = get_deleted_files(arguments.base_commit)
all_files = added_files + modified_files + deleted_files
if not all_files:
print("No changes detected")
return
# Filter for module files only
new_module_files = [f for f in added_files if is_module_file(f)]
modified_module_files = [f for f in modified_files if is_module_file(f)]
deleted_module_files = [f for f in deleted_files if is_module_file(f)]
if not new_module_files and not modified_module_files and not deleted_module_files:
print("No module changes detected")
return
async with aiohttp.ClientSession() as session:
# Handle deleted files first
for file_path in deleted_module_files:
try:
module_name = extract_module_name(file_path)
message = f"🪼 Module {module_name} has been deleted"
result = await send_message(session, message)
print(f"Sent deletion notice for {module_name}: {result}")
except Exception as e:
print(f"Error processing deleted {file_path}: {e}")
# Handle newly added modules
for file_path in new_module_files:
try:
module_name = extract_module_name(file_path)
developer = get_module_developer(file_path)
github_url = f"https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/{file_path}"
try:
new_hash = subprocess.check_output(
['git', 'rev-list', '-n', '1', 'HEAD', '--', file_path],
cwd=os.getcwd()
).decode().strip()
except Exception:
new_hash = 'HEAD'
try:
old_hash = subprocess.check_output(
['git', 'rev-list', '-n', '1', arguments.base_commit, '--', file_path],
cwd=os.getcwd()
).decode().strip()
except Exception:
old_hash = arguments.base_commit
diff_url = f"https://github.com/MuRuLOSE/limoka/compare/{old_hash}...{new_hash}.diff"
title = f"🪼 New module {module_name} approved"
if developer:
title += f"\n{developer}"
message = (
f"{title}\n\n"
f"File URL | "
f"Diff URL"
)
diff = get_file_diff(file_path, arguments.base_commit)
if not diff:
print(f"Skipping {file_path} - no diff content")
continue
diff_filename = f"{module_name}.diff"
with tempfile.NamedTemporaryFile(
mode='w',
suffix='',
prefix='',
delete=False,
encoding='utf-8',
dir=tempfile.gettempdir()
) as tmp_file:
tmp_file.write(diff)
tmp_file_path = tmp_file.name
try:
final_path = os.path.join(tempfile.gettempdir(), diff_filename)
os.rename(tmp_file_path, final_path)
doc_result = await send_document(session, final_path, caption=message)
print(f"Sent new module diff for {module_name}: {doc_result}")
except Exception as e:
print(f"Error sending {module_name}: {e}")
finally:
if os.path.exists(tmp_file_path):
try:
os.remove(tmp_file_path)
except:
pass
final_path = os.path.join(tempfile.gettempdir(), diff_filename)
if os.path.exists(final_path):
try:
os.remove(final_path)
except:
pass
except Exception as e:
print(f"Error processing new file {file_path}: {e}")
# Handle modified files
for file_path in modified_module_files:
try:
module_name = extract_module_name(file_path)
developer = get_module_developer(file_path)
github_url = f"https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/{file_path}"
try:
new_hash = subprocess.check_output(
['git', 'rev-list', '-n', '1', 'HEAD', '--', file_path],
cwd=os.getcwd()
).decode().strip()
except Exception:
new_hash = 'HEAD'
try:
old_hash = subprocess.check_output(
['git', 'rev-list', '-n', '1', arguments.base_commit, '--', file_path],
cwd=os.getcwd()
).decode().strip()
except Exception:
old_hash = arguments.base_commit
diff_url = f"https://github.com/MuRuLOSE/limoka/compare/{old_hash}...{new_hash}.diff"
title = f"🪼 Module {module_name} changes approved"
if developer:
title += f"\nby {developer}"
message = (
f"{title}\n\n"
f"File URL | "
f"Diff URL"
)
diff = get_file_diff(file_path, arguments.base_commit)
if not diff:
print(f"Skipping {file_path} - no diff content")
continue
diff_filename = f"{module_name}.diff"
with tempfile.NamedTemporaryFile(
mode='w',
suffix='',
prefix='',
delete=False,
encoding='utf-8',
dir=tempfile.gettempdir()
) as tmp_file:
tmp_file.write(diff)
tmp_file_path = tmp_file.name
try:
final_path = os.path.join(tempfile.gettempdir(), diff_filename)
os.rename(tmp_file_path, final_path)
doc_result = await send_document(session, final_path, caption=message)
print(f"Sent diff for {module_name}: {doc_result}")
except Exception as e:
print(f"Error sending {module_name}: {e}")
finally:
if os.path.exists(tmp_file_path):
try:
os.remove(tmp_file_path)
except:
pass
final_path = os.path.join(tempfile.gettempdir(), diff_filename)
if os.path.exists(final_path):
try:
os.remove(final_path)
except:
pass
except Exception as e:
print(f"Error processing modified {file_path}: {e}")
if __name__ == "__main__":
asyncio.run(main())