187 lines
6.1 KiB
Python
187 lines
6.1 KiB
Python
import argparse
|
|
import queue
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
from rich.console import Console
|
|
from rich.progress import (
|
|
BarColumn,
|
|
Progress,
|
|
TaskProgressColumn,
|
|
TextColumn,
|
|
TimeElapsedColumn,
|
|
TimeRemainingColumn,
|
|
)
|
|
from rich.text import Text
|
|
|
|
console = Console(stderr=True)
|
|
|
|
|
|
def probe_duration(input_file: str) -> float | None:
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe", "-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
input_file,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return float(result.stdout.strip())
|
|
except (ValueError, FileNotFoundError):
|
|
return None
|
|
|
|
|
|
def build_ffmpeg_cmd(args) -> list[str]:
|
|
cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y"]
|
|
|
|
if args.ss:
|
|
cmd += ["-ss", args.ss]
|
|
if args.to:
|
|
cmd += ["-to", args.to]
|
|
|
|
cmd += ["-i", args.input]
|
|
|
|
filters = ["hqdn3d=4:3:6:4", f"scale=-1:{args.height}", "unsharp"]
|
|
if args.crop:
|
|
filters.append("crop=ih/3*4:ih")
|
|
|
|
cmd += ["-vf", ",".join(filters)]
|
|
|
|
gpu = args.gpu
|
|
if gpu == "nvidia":
|
|
cmd += ["-c:v", "h264_nvenc", "-cq", str(args.crf), "-preset", "p4"]
|
|
elif gpu == "amd":
|
|
cmd += ["-c:v", "h264_amf", "-rc", "cqp", "-qp_i", str(args.crf), "-qp_p", str(args.crf)]
|
|
elif gpu == "intel":
|
|
cmd += ["-c:v", "h264_qsv", "-global_quality", str(args.crf), "-preset", args.preset]
|
|
else:
|
|
cmd += ["-crf", str(args.crf), "-preset", args.preset]
|
|
|
|
if args.threads > 0:
|
|
cmd += ["-threads", str(args.threads)]
|
|
|
|
if args.no_audio:
|
|
cmd += ["-an"]
|
|
else:
|
|
cmd += ["-c:a", "copy"]
|
|
|
|
cmd += ["-progress", "pipe:1", "-stats_period", "0.5"]
|
|
cmd.append(args.output)
|
|
return cmd
|
|
|
|
|
|
def _pipe_reader(stream, q: queue.Queue):
|
|
for line in iter(stream.readline, b""):
|
|
q.put(line.decode(errors="replace").strip())
|
|
q.put(None)
|
|
|
|
|
|
def run(cmd: list[str], duration: float | None) -> int:
|
|
label = Text(Path(cmd[-1]).name, style="bold cyan")
|
|
total = duration if duration else 100.0
|
|
|
|
with Progress(
|
|
TextColumn("{task.description}"),
|
|
BarColumn(),
|
|
TaskProgressColumn(),
|
|
TimeElapsedColumn(),
|
|
TimeRemainingColumn(),
|
|
console=console,
|
|
) as progress:
|
|
task = progress.add_task(str(label), total=total)
|
|
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
stdout_q: queue.Queue = queue.Queue()
|
|
threading.Thread(target=_pipe_reader, args=(proc.stdout, stdout_q), daemon=True).start()
|
|
threading.Thread(target=_pipe_reader, args=(proc.stderr, queue.Queue()), daemon=True).start()
|
|
|
|
current: dict[str, str] = {}
|
|
try:
|
|
while True:
|
|
line = stdout_q.get()
|
|
if line is None:
|
|
break
|
|
key, _, value = line.partition("=")
|
|
current[key] = value
|
|
|
|
if key == "out_time" and duration:
|
|
try:
|
|
h, m, s = value.split(":")
|
|
elapsed_s = int(h) * 3600 + int(m) * 60 + float(s)
|
|
progress.update(task, completed=min(elapsed_s, duration))
|
|
except ValueError:
|
|
pass
|
|
elif key == "speed" and not duration:
|
|
progress.update(task, advance=0.5)
|
|
elif key == "progress" and value == "end":
|
|
progress.update(task, completed=total)
|
|
except KeyboardInterrupt:
|
|
proc.terminate()
|
|
proc.wait()
|
|
console.print("\n[yellow]cancelled[/yellow]")
|
|
return 130
|
|
|
|
proc.wait()
|
|
|
|
if proc.returncode != 0:
|
|
console.print(f"[red]ffmpeg failed (exit {proc.returncode})[/red]")
|
|
else:
|
|
fps = current.get("fps", "?")
|
|
speed = current.get("speed", "?").strip()
|
|
size_bytes = int(current.get("total_size", 0))
|
|
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
|
|
if size_bytes < 1024 or unit == "TiB":
|
|
break
|
|
size_bytes /= 1024
|
|
size_str = f"{size_bytes:.1f} {unit}"
|
|
console.print(f"[green]done[/green] {size_str} {fps} fps {speed}")
|
|
|
|
return proc.returncode
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="HDZero encode helper")
|
|
parser.add_argument("input", help="Input file")
|
|
parser.add_argument("output", help="Output file")
|
|
parser.add_argument("-ss", help="Start time (e.g. 00:01:30 or 90)")
|
|
parser.add_argument("-to", help="End time (e.g. 00:02:00 or 120)")
|
|
parser.add_argument("--crop", action="store_true", help="Crop to 4:3 (crop=ih/3*4:ih)")
|
|
parser.add_argument("--no-audio", action="store_true", help="Strip audio (-an)")
|
|
parser.add_argument("--height", default=1080, type=int, help="Scale height (default: 1080)")
|
|
parser.add_argument("--crf", default=23, type=int, help="CRF value (default: 23)")
|
|
parser.add_argument("--preset", default="fast", help="Encoder preset (default: fast)")
|
|
parser.add_argument("--gpu", nargs="?", const="nvidia", choices=["nvidia", "amd", "intel"],
|
|
help="Use GPU encoder (default: nvidia). Choices: nvidia, amd, intel")
|
|
parser.add_argument("--threads", type=int, default=0, help="Limit CPU threads (default: all)")
|
|
parser.add_argument("-n", "--dry-run", action="store_true", help="Print command without running")
|
|
|
|
args = parser.parse_args()
|
|
cmd = build_ffmpeg_cmd(args)
|
|
|
|
if args.dry_run:
|
|
t = Text()
|
|
for i, token in enumerate(cmd):
|
|
if i > 0:
|
|
t.append(" ")
|
|
if i == 0:
|
|
t.append(token, style="bold cyan")
|
|
elif token.startswith("-"):
|
|
t.append(token, style="bold yellow")
|
|
else:
|
|
t.append(token, style="white")
|
|
Console().print(t)
|
|
return
|
|
|
|
duration = probe_duration(args.input)
|
|
sys.exit(run(cmd, duration))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|