import argparse import queue import subprocess import sys import threading from pathlib import Path from rich.console import Console from rich.progress import ( BarColumn, Progress, TaskProgressColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from rich.text import Text console = Console(stderr=True) def probe_duration(input_file: str) -> float | None: try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", input_file, ], capture_output=True, text=True, ) return float(result.stdout.strip()) except (ValueError, FileNotFoundError): return None def build_ffmpeg_cmd(args) -> list[str]: cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y"] if args.ss: cmd += ["-ss", args.ss] if args.to: cmd += ["-to", args.to] cmd += ["-i", args.input] filters = ["hqdn3d=4:3:6:4", f"scale=-1:{args.height}", "unsharp"] if args.crop: filters.append("crop=ih/3*4:ih") cmd += ["-vf", ",".join(filters)] gpu = args.gpu if gpu == "nvidia": cmd += ["-c:v", "h264_nvenc", "-cq", str(args.crf), "-preset", "p4"] elif gpu == "amd": cmd += ["-c:v", "h264_amf", "-rc", "cqp", "-qp_i", str(args.crf), "-qp_p", str(args.crf)] elif gpu == "intel": cmd += ["-c:v", "h264_qsv", "-global_quality", str(args.crf), "-preset", args.preset] else: cmd += ["-crf", str(args.crf), "-preset", args.preset] if args.threads > 0: cmd += ["-threads", str(args.threads)] if args.no_audio: cmd += ["-an"] else: cmd += ["-c:a", "copy"] cmd += ["-progress", "pipe:1", "-stats_period", "0.5"] cmd.append(args.output) return cmd def _pipe_reader(stream, q: queue.Queue): for line in iter(stream.readline, b""): q.put(line.decode(errors="replace").strip()) q.put(None) def run(cmd: list[str], duration: float | None) -> int: label = Text(Path(cmd[-1]).name, style="bold cyan") total = duration if duration else 100.0 with Progress( TextColumn("{task.description}"), BarColumn(), TaskProgressColumn(), TimeElapsedColumn(), TimeRemainingColumn(), console=console, ) as progress: task = progress.add_task(str(label), total=total) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout_q: queue.Queue = queue.Queue() threading.Thread(target=_pipe_reader, args=(proc.stdout, stdout_q), daemon=True).start() threading.Thread(target=_pipe_reader, args=(proc.stderr, queue.Queue()), daemon=True).start() current: dict[str, str] = {} try: while True: line = stdout_q.get() if line is None: break key, _, value = line.partition("=") current[key] = value if key == "out_time" and duration: try: h, m, s = value.split(":") elapsed_s = int(h) * 3600 + int(m) * 60 + float(s) progress.update(task, completed=min(elapsed_s, duration)) except ValueError: pass elif key == "speed" and not duration: progress.update(task, advance=0.5) elif key == "progress" and value == "end": progress.update(task, completed=total) except KeyboardInterrupt: proc.terminate() proc.wait() console.print("\n[yellow]cancelled[/yellow]") return 130 proc.wait() if proc.returncode != 0: console.print(f"[red]ffmpeg failed (exit {proc.returncode})[/red]") else: fps = current.get("fps", "?") speed = current.get("speed", "?").strip() size_bytes = int(current.get("total_size", 0)) for unit in ("B", "KiB", "MiB", "GiB", "TiB"): if size_bytes < 1024 or unit == "TiB": break size_bytes /= 1024 size_str = f"{size_bytes:.1f} {unit}" console.print(f"[green]done[/green] {size_str} {fps} fps {speed}") return proc.returncode def main(): parser = argparse.ArgumentParser(description="HDZero encode helper") parser.add_argument("input", help="Input file") parser.add_argument("output", help="Output file") parser.add_argument("-ss", help="Start time (e.g. 00:01:30 or 90)") parser.add_argument("-to", help="End time (e.g. 00:02:00 or 120)") parser.add_argument("--crop", action="store_true", help="Crop to 4:3 (crop=ih/3*4:ih)") parser.add_argument("--no-audio", action="store_true", help="Strip audio (-an)") parser.add_argument("--height", default=1080, type=int, help="Scale height (default: 1080)") parser.add_argument("--crf", default=23, type=int, help="CRF value (default: 23)") parser.add_argument("--preset", default="fast", help="Encoder preset (default: fast)") parser.add_argument("--gpu", nargs="?", const="nvidia", choices=["nvidia", "amd", "intel"], help="Use GPU encoder (default: nvidia). Choices: nvidia, amd, intel") parser.add_argument("--threads", type=int, default=0, help="Limit CPU threads (default: all)") parser.add_argument("-n", "--dry-run", action="store_true", help="Print command without running") args = parser.parse_args() cmd = build_ffmpeg_cmd(args) if args.dry_run: t = Text() for i, token in enumerate(cmd): if i > 0: t.append(" ") if i == 0: t.append(token, style="bold cyan") elif token.startswith("-"): t.append(token, style="bold yellow") else: t.append(token, style="white") Console().print(t) return duration = probe_duration(args.input) sys.exit(run(cmd, duration)) if __name__ == "__main__": main()