Initial commit
This commit is contained in:
@@ -0,0 +1,186 @@
|
||||
import argparse
|
||||
import queue
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
from rich.console import Console
|
||||
from rich.progress import (
|
||||
BarColumn,
|
||||
Progress,
|
||||
TaskProgressColumn,
|
||||
TextColumn,
|
||||
TimeElapsedColumn,
|
||||
TimeRemainingColumn,
|
||||
)
|
||||
from rich.text import Text
|
||||
|
||||
console = Console(stderr=True)
|
||||
|
||||
|
||||
def probe_duration(input_file: str) -> float | None:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"ffprobe", "-v", "error",
|
||||
"-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
input_file,
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
return float(result.stdout.strip())
|
||||
except (ValueError, FileNotFoundError):
|
||||
return None
|
||||
|
||||
|
||||
def build_ffmpeg_cmd(args) -> list[str]:
|
||||
cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y"]
|
||||
|
||||
if args.ss:
|
||||
cmd += ["-ss", args.ss]
|
||||
if args.to:
|
||||
cmd += ["-to", args.to]
|
||||
|
||||
cmd += ["-i", args.input]
|
||||
|
||||
filters = ["hqdn3d=4:3:6:4", f"scale=-1:{args.height}", "unsharp"]
|
||||
if args.crop:
|
||||
filters.append("crop=ih/3*4:ih")
|
||||
|
||||
cmd += ["-vf", ",".join(filters)]
|
||||
|
||||
gpu = args.gpu
|
||||
if gpu == "nvidia":
|
||||
cmd += ["-c:v", "h264_nvenc", "-cq", str(args.crf), "-preset", "p4"]
|
||||
elif gpu == "amd":
|
||||
cmd += ["-c:v", "h264_amf", "-rc", "cqp", "-qp_i", str(args.crf), "-qp_p", str(args.crf)]
|
||||
elif gpu == "intel":
|
||||
cmd += ["-c:v", "h264_qsv", "-global_quality", str(args.crf), "-preset", args.preset]
|
||||
else:
|
||||
cmd += ["-crf", str(args.crf), "-preset", args.preset]
|
||||
|
||||
if args.threads > 0:
|
||||
cmd += ["-threads", str(args.threads)]
|
||||
|
||||
if args.no_audio:
|
||||
cmd += ["-an"]
|
||||
else:
|
||||
cmd += ["-c:a", "copy"]
|
||||
|
||||
cmd += ["-progress", "pipe:1", "-stats_period", "0.5"]
|
||||
cmd.append(args.output)
|
||||
return cmd
|
||||
|
||||
|
||||
def _pipe_reader(stream, q: queue.Queue):
|
||||
for line in iter(stream.readline, b""):
|
||||
q.put(line.decode(errors="replace").strip())
|
||||
q.put(None)
|
||||
|
||||
|
||||
def run(cmd: list[str], duration: float | None) -> int:
|
||||
label = Text(Path(cmd[-1]).name, style="bold cyan")
|
||||
total = duration if duration else 100.0
|
||||
|
||||
with Progress(
|
||||
TextColumn("{task.description}"),
|
||||
BarColumn(),
|
||||
TaskProgressColumn(),
|
||||
TimeElapsedColumn(),
|
||||
TimeRemainingColumn(),
|
||||
console=console,
|
||||
) as progress:
|
||||
task = progress.add_task(str(label), total=total)
|
||||
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
stdout_q: queue.Queue = queue.Queue()
|
||||
threading.Thread(target=_pipe_reader, args=(proc.stdout, stdout_q), daemon=True).start()
|
||||
threading.Thread(target=_pipe_reader, args=(proc.stderr, queue.Queue()), daemon=True).start()
|
||||
|
||||
current: dict[str, str] = {}
|
||||
try:
|
||||
while True:
|
||||
line = stdout_q.get()
|
||||
if line is None:
|
||||
break
|
||||
key, _, value = line.partition("=")
|
||||
current[key] = value
|
||||
|
||||
if key == "out_time" and duration:
|
||||
try:
|
||||
h, m, s = value.split(":")
|
||||
elapsed_s = int(h) * 3600 + int(m) * 60 + float(s)
|
||||
progress.update(task, completed=min(elapsed_s, duration))
|
||||
except ValueError:
|
||||
pass
|
||||
elif key == "speed" and not duration:
|
||||
progress.update(task, advance=0.5)
|
||||
elif key == "progress" and value == "end":
|
||||
progress.update(task, completed=total)
|
||||
except KeyboardInterrupt:
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
console.print("\n[yellow]cancelled[/yellow]")
|
||||
return 130
|
||||
|
||||
proc.wait()
|
||||
|
||||
if proc.returncode != 0:
|
||||
console.print(f"[red]ffmpeg failed (exit {proc.returncode})[/red]")
|
||||
else:
|
||||
fps = current.get("fps", "?")
|
||||
speed = current.get("speed", "?").strip()
|
||||
size_bytes = int(current.get("total_size", 0))
|
||||
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
|
||||
if size_bytes < 1024 or unit == "TiB":
|
||||
break
|
||||
size_bytes /= 1024
|
||||
size_str = f"{size_bytes:.1f} {unit}"
|
||||
console.print(f"[green]done[/green] {size_str} {fps} fps {speed}")
|
||||
|
||||
return proc.returncode
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="HDZero encode helper")
|
||||
parser.add_argument("input", help="Input file")
|
||||
parser.add_argument("output", help="Output file")
|
||||
parser.add_argument("-ss", help="Start time (e.g. 00:01:30 or 90)")
|
||||
parser.add_argument("-to", help="End time (e.g. 00:02:00 or 120)")
|
||||
parser.add_argument("--crop", action="store_true", help="Crop to 4:3 (crop=ih/3*4:ih)")
|
||||
parser.add_argument("--no-audio", action="store_true", help="Strip audio (-an)")
|
||||
parser.add_argument("--height", default=1080, type=int, help="Scale height (default: 1080)")
|
||||
parser.add_argument("--crf", default=23, type=int, help="CRF value (default: 23)")
|
||||
parser.add_argument("--preset", default="fast", help="Encoder preset (default: fast)")
|
||||
parser.add_argument("--gpu", nargs="?", const="nvidia", choices=["nvidia", "amd", "intel"],
|
||||
help="Use GPU encoder (default: nvidia). Choices: nvidia, amd, intel")
|
||||
parser.add_argument("--threads", type=int, default=0, help="Limit CPU threads (default: all)")
|
||||
parser.add_argument("-n", "--dry-run", action="store_true", help="Print command without running")
|
||||
|
||||
args = parser.parse_args()
|
||||
cmd = build_ffmpeg_cmd(args)
|
||||
|
||||
if args.dry_run:
|
||||
t = Text()
|
||||
for i, token in enumerate(cmd):
|
||||
if i > 0:
|
||||
t.append(" ")
|
||||
if i == 0:
|
||||
t.append(token, style="bold cyan")
|
||||
elif token.startswith("-"):
|
||||
t.append(token, style="bold yellow")
|
||||
else:
|
||||
t.append(token, style="white")
|
||||
Console().print(t)
|
||||
return
|
||||
|
||||
duration = probe_duration(args.input)
|
||||
sys.exit(run(cmd, duration))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user