#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import requests
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from pathlib import Path
import tempfile
import sys
import io
import json
from urllib.parse import urlparse, urlunparse
from tr6260_ota_v1_builder import build_fotapkg_v1_diff
def http_read_range(base_url: str, start: int, length: int) -> bytes:
url = f"{base_url}/api/flash/{start:X}-{length:X}"
r = requests.get(url, timeout=15)
r.raise_for_status()
return r.content
def read_partition_table(base_url: str) -> bytes:
return http_read_range(base_url, 0x6000, 0x1000)
def read_base_firmware(base_url: str) -> bytes:
header = http_read_range(base_url, 0x7000, 16)
try:
size1 = int(header[0:8].decode())
size2 = int(header[8:16].decode())
except Exception:
raise RuntimeError("Invalid firmware header at 0x7000")
total_size = size1 + size2 + 16
return http_read_range(base_url, 0x7000, total_size)
def push_ota(base_url: str, ota_bytes: bytes) -> str:
url = f"{base_url}/api/ota"
headers = {"Content-Type": "application/octet-stream"}
r = requests.post(url, data=ota_bytes, headers=headers, timeout=15)
r.raise_for_status()
return r.text
def verify_firmware_header(fw_bytes: bytes) -> int:
if len(fw_bytes) < 16:
raise RuntimeError("Firmware too small")
header = fw_bytes[:16]
try:
size1 = int(header[:8].decode("ascii"))
size2 = int(header[8:16].decode("ascii"))
except Exception:
raise RuntimeError("Firmware firmware header")
total_size = size1 + size2 + 16
return total_size
def get_base_url(user_url: str) -> str:
parsed = urlparse(user_url.strip())
base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
return base_url
class TextRedirector(io.StringIO):
def __init__(self, widget):
super().__init__()
self.widget = widget
def write(self, s):
self.widget.insert("end", s)
self.widget.see("end")
self.widget.update()
def flush(self):
pass
class OTAGui(tk.Tk):
def __init__(self):
super().__init__()
self.title("TR6260 OTA Builder")
self.geometry("800x550")
self.base_url_var = tk.StringVar()
self.new_fw_path = tk.StringVar()
self.create_widgets()
def create_widgets(self):
frame = ttk.Frame(self, padding=10)
frame.pack(fill="both", expand=True)
ttk.Label(frame, text="Device URL:").grid(row=0, column=0, sticky="w")
ttk.Entry(frame, textvariable=self.base_url_var, width=60).grid(row=0, column=1, sticky="we")
ttk.Label(frame, text="New firmware:").grid(row=1, column=0, sticky="w")
ttk.Entry(frame, textvariable=self.new_fw_path, width=60).grid(row=1, column=1, sticky="we")
ttk.Button(frame, text="Browse", command=self.select_new_fw).grid(row=1, column=2)
self.dry_run_var = tk.BooleanVar(value=False)
ttk.Checkbutton(frame, text="Dry run", variable=self.dry_run_var).grid(row=2, column=0, sticky="w")
ttk.Button(frame, text="Perform OTA", command=self.start_build).grid(row=2, column=1, pady=10)
self.log_text = tk.Text(frame, height=22)
self.log_text.grid(row=4, column=0, columnspan=3, sticky="nsew")
frame.rowconfigure(4, weight=1)
frame.columnconfigure(1, weight=1)
def select_new_fw(self):
path = filedialog.askopenfilename()
if path:
self.new_fw_path.set(path)
def start_build(self):
threading.Thread(target=self.build_and_push, daemon=True).start()
def build_and_push(self):
sys.stdout = TextRedirector(app.log_text)
if not self.base_url_var.get().strip():
messagebox.showerror("Error", "Device HTTP URL")
return
if not Path(self.new_fw_path.get()).exists():
messagebox.showerror("Error", "New firmware")
return
try:
total_size = verify_firmware_header(Path(self.new_fw_path.get()).read_bytes())
print(f"New firmware header verified, total size: {total_size} bytes")
except RuntimeError as e:
messagebox.showerror("Firmware Error", str(e))
return
try:
dry_run = self.dry_run_var.get()
if dry_run:
print("\n--- DRY RUN MODE ENABLED ---")
print("Firmware will not uploaded.\n")
base_url = get_base_url(self.base_url_var.get())
new_fw = Path(self.new_fw_path.get())
print("Downloading partition table...")
partitions = read_partition_table(base_url)
print("Downloading base firmware...")
old_fw = read_base_firmware(base_url)
print(f"Base firmware size: {len(old_fw)} bytes")
tmp_dir = tempfile.mkdtemp()
infilelist_path = Path(tmp_dir) / "infilelist.bin"
old_fw_path = Path(tmp_dir) / "old_fw.bin"
out_path = Path(tmp_dir) / "ota.img"
infilelist_path.write_bytes(partitions)
old_fw_path.write_bytes(old_fw)
print("Building OTA package...\n")
build_fotapkg_v1_diff(
infilelist_path,
old_fw_path,
new_fw,
out_path,
chunk_size=0x5000,
patch_align=0x10,
dict_size=0x2000,
lc=0,
lp=0,
pb=0,
nice_len=32,
aligned_skip_threshold=0x700,
mode="tight",
self_check=True,
compare_to=None,
verbose=True,
)
ota_data = out_path.read_bytes()
print(f"\nOTA built: {len(ota_data)} bytes")
for f in [infilelist_path, old_fw_path, out_path]:
if f.exists():
f.unlink()
tmp_dir_path = Path(tmp_dir)
if tmp_dir_path.exists():
tmp_dir_path.rmdir()
if dry_run:
print("\nDry run complete, skipping upload.")
return
print("Uploading to device...")
print("Device may crash at the end, so ignore timeout error if it appears.")
print("It will reboot and apply update anyway.\n")
response = push_ota(base_url, ota_data)
print("Device response:")
print(response)
try:
resp_json = json.loads(response)
if "size" in resp_json:
print("\nOTA success detected, rebooting device...")
r = requests.post(f"{base_url}/api/reboot", timeout=5)
r.raise_for_status()
except json.JSONDecodeError:
print("\nResponse not JSON.")
except Exception as e:
print("\nReboot failed:", e)
print("\nDONE")
except Exception as e:
print("\nERROR:", e)
finally:
sys.stdout = sys.__stdout__
if __name__ == "__main__":
app = OTAGui()
app.mainloop()