Files
2026-04-05 17:19:06 +02:00

187 lines
6.1 KiB
Python

import argparse
import queue
import subprocess
import sys
import threading
from pathlib import Path
from rich.console import Console
from rich.progress import (
BarColumn,
Progress,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.text import Text
console = Console(stderr=True)
def probe_duration(input_file: str) -> float | None:
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
input_file,
],
capture_output=True,
text=True,
)
return float(result.stdout.strip())
except (ValueError, FileNotFoundError):
return None
def build_ffmpeg_cmd(args) -> list[str]:
cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y"]
if args.ss:
cmd += ["-ss", args.ss]
if args.to:
cmd += ["-to", args.to]
cmd += ["-i", args.input]
filters = ["hqdn3d=4:3:6:4", f"scale=-1:{args.height}", "unsharp"]
if args.crop:
filters.append("crop=ih/3*4:ih")
cmd += ["-vf", ",".join(filters)]
gpu = args.gpu
if gpu == "nvidia":
cmd += ["-c:v", "h264_nvenc", "-cq", str(args.crf), "-preset", "p4"]
elif gpu == "amd":
cmd += ["-c:v", "h264_amf", "-rc", "cqp", "-qp_i", str(args.crf), "-qp_p", str(args.crf)]
elif gpu == "intel":
cmd += ["-c:v", "h264_qsv", "-global_quality", str(args.crf), "-preset", args.preset]
else:
cmd += ["-crf", str(args.crf), "-preset", args.preset]
if args.threads > 0:
cmd += ["-threads", str(args.threads)]
if args.no_audio:
cmd += ["-an"]
else:
cmd += ["-c:a", "copy"]
cmd += ["-progress", "pipe:1", "-stats_period", "0.5"]
cmd.append(args.output)
return cmd
def _pipe_reader(stream, q: queue.Queue):
for line in iter(stream.readline, b""):
q.put(line.decode(errors="replace").strip())
q.put(None)
def run(cmd: list[str], duration: float | None) -> int:
label = Text(Path(cmd[-1]).name, style="bold cyan")
total = duration if duration else 100.0
with Progress(
TextColumn("{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=console,
) as progress:
task = progress.add_task(str(label), total=total)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_q: queue.Queue = queue.Queue()
threading.Thread(target=_pipe_reader, args=(proc.stdout, stdout_q), daemon=True).start()
threading.Thread(target=_pipe_reader, args=(proc.stderr, queue.Queue()), daemon=True).start()
current: dict[str, str] = {}
try:
while True:
line = stdout_q.get()
if line is None:
break
key, _, value = line.partition("=")
current[key] = value
if key == "out_time" and duration:
try:
h, m, s = value.split(":")
elapsed_s = int(h) * 3600 + int(m) * 60 + float(s)
progress.update(task, completed=min(elapsed_s, duration))
except ValueError:
pass
elif key == "speed" and not duration:
progress.update(task, advance=0.5)
elif key == "progress" and value == "end":
progress.update(task, completed=total)
except KeyboardInterrupt:
proc.terminate()
proc.wait()
console.print("\n[yellow]cancelled[/yellow]")
return 130
proc.wait()
if proc.returncode != 0:
console.print(f"[red]ffmpeg failed (exit {proc.returncode})[/red]")
else:
fps = current.get("fps", "?")
speed = current.get("speed", "?").strip()
size_bytes = int(current.get("total_size", 0))
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
if size_bytes < 1024 or unit == "TiB":
break
size_bytes /= 1024
size_str = f"{size_bytes:.1f} {unit}"
console.print(f"[green]done[/green] {size_str} {fps} fps {speed}")
return proc.returncode
def main():
parser = argparse.ArgumentParser(description="HDZero encode helper")
parser.add_argument("input", help="Input file")
parser.add_argument("output", help="Output file")
parser.add_argument("-ss", help="Start time (e.g. 00:01:30 or 90)")
parser.add_argument("-to", help="End time (e.g. 00:02:00 or 120)")
parser.add_argument("--crop", action="store_true", help="Crop to 4:3 (crop=ih/3*4:ih)")
parser.add_argument("--no-audio", action="store_true", help="Strip audio (-an)")
parser.add_argument("--height", default=1080, type=int, help="Scale height (default: 1080)")
parser.add_argument("--crf", default=23, type=int, help="CRF value (default: 23)")
parser.add_argument("--preset", default="fast", help="Encoder preset (default: fast)")
parser.add_argument("--gpu", nargs="?", const="nvidia", choices=["nvidia", "amd", "intel"],
help="Use GPU encoder (default: nvidia). Choices: nvidia, amd, intel")
parser.add_argument("--threads", type=int, default=0, help="Limit CPU threads (default: all)")
parser.add_argument("-n", "--dry-run", action="store_true", help="Print command without running")
args = parser.parse_args()
cmd = build_ffmpeg_cmd(args)
if args.dry_run:
t = Text()
for i, token in enumerate(cmd):
if i > 0:
t.append(" ")
if i == 0:
t.append(token, style="bold cyan")
elif token.startswith("-"):
t.append(token, style="bold yellow")
else:
t.append(token, style="white")
Console().print(t)
return
duration = probe_duration(args.input)
sys.exit(run(cmd, duration))
if __name__ == "__main__":
main()