Files
hw-g6pro360/morse.py

214 lines
7.6 KiB
Python

# /// script
# requires-python = ">=3.11"
# dependencies = ["httpx[http2]", "python-dotenv"]
# ///
"""Transmit a message as Morse code on the camera speaker and status LED in sync."""
import argparse
import array
import math
import os
import subprocess
import tempfile
import threading
import time
import httpx
from dotenv import load_dotenv
load_dotenv()
HOST = os.environ["HOST"]
BASE = f"https://{HOST}/proxy/protect/integration/v1"
PRIVATE_BASE = f"https://{HOST}/proxy/protect/api"
HEADERS = {"X-API-Key": os.environ["API_KEY"]}
USERNAME = os.environ["UNIFI_USERNAME"]
PASSWORD = os.environ["UNIFI_PASSWORD"]
CAMERA_NAME = "G6 Pro 360"
TONE_FREQ = 700 # Hz
MORSE = {
'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.',
'F': '..-.', 'G': '--.', 'H': '....', 'I': '..', 'J': '.---',
'K': '-.-', 'L': '.-..', 'M': '--', 'N': '-.', 'O': '---',
'P': '.--.', 'Q': '--.-', 'R': '.-.', 'S': '...', 'T': '-',
'U': '..-', 'V': '...-', 'W': '.--', 'X': '-..-', 'Y': '-.--',
'Z': '--..',
'0': '-----', '1': '.----', '2': '..---', '3': '...--', '4': '....-',
'5': '.....', '6': '-....', '7': '--...', '8': '---..', '9': '----.',
'.': '.-.-.-', ',': '--..--', '?': '..--..', '/': '-..-.', '=': '-...-',
}
def set_speaker_volume(camera_id: str, volume: int):
# Accept-Encoding must be suppressed — server returns 403 when present
with httpx.Client(verify=False, http2=True) as client:
res = client.post(
f"https://{HOST}/api/auth/login",
content=f'{{"username":"{USERNAME}","password":"{PASSWORD}","rememberMe":false}}'.encode(),
headers={"Content-Type": "application/json", "Accept-Encoding": ""},
)
res.raise_for_status()
csrf = res.headers.get("x-csrf-token")
client.patch(
f"{PRIVATE_BASE}/cameras/{camera_id}",
headers={"x-csrf-token": csrf},
json={"speakerSettings": {"volume": volume, "speakerVolume": volume}},
).raise_for_status()
print(f"Volume : {volume}")
def text_to_sequence(text: str, unit: float) -> list[tuple[bool, float]]:
"""Convert text to a list of (is_on, duration_seconds) pulses."""
seq: list[tuple[bool, float]] = []
words = text.upper().split()
for wi, word in enumerate(words):
letters = [ch for ch in word if ch in MORSE]
for li, ch in enumerate(letters):
code = MORSE[ch]
for si, sym in enumerate(code):
seq.append((True, unit if sym == '.' else 3 * unit))
if si < len(code) - 1:
seq.append((False, unit)) # inter-symbol gap
if li < len(letters) - 1:
seq.append((False, 3 * unit)) # inter-letter gap
if wi < len(words) - 1:
seq.append((False, 7 * unit)) # inter-word gap
return seq
def generate_pcm(sequence: list[tuple[bool, float]], sample_rate: int, lead_in: float) -> bytes:
"""Generate signed 16-bit little-endian PCM for the morse sequence."""
FADE = int(0.005 * sample_rate) # 5ms linear fade in/out to avoid clicks
samples: array.array = array.array('h')
samples.extend([0] * int(lead_in * sample_rate))
t = 0
for is_on, duration in sequence:
n = int(duration * sample_rate)
if is_on:
for i in range(n):
if i < FADE:
env = i / FADE
elif i >= n - FADE:
env = (n - i) / FADE
else:
env = 1.0
val = int(32767 * 0.8 * env * math.sin(2 * math.pi * TONE_FREQ * (t + i) / sample_rate))
samples.append(val)
else:
samples.extend([0] * n)
t += n
return samples.tobytes()
def _sleep_interruptible(duration: float, stop: threading.Event):
deadline = time.monotonic() + duration
while not stop.is_set():
left = deadline - time.monotonic()
if left <= 0:
break
time.sleep(min(left, 0.02))
def led_worker(camera_id: str, sequence: list[tuple[bool, float]], lead_in: float, original_led: bool, stop: threading.Event):
"""Toggle the LED in a dedicated thread with its own HTTP client."""
with httpx.Client(headers=HEADERS, verify=False) as client:
def set_led(state: bool):
client.patch(
f"{BASE}/cameras/{camera_id}",
json={"ledSettings": {"isEnabled": state}},
).raise_for_status()
try:
_sleep_interruptible(lead_in, stop)
for is_on, duration in sequence:
if stop.is_set():
break
t0 = time.monotonic()
set_led(is_on)
remaining = duration - (time.monotonic() - t0)
_sleep_interruptible(remaining, stop)
finally:
set_led(original_led)
def main():
parser = argparse.ArgumentParser(description="Morse code on camera speaker + LED")
parser.add_argument("message", nargs="?", default="SOS", help="Message to transmit (default: SOS)")
parser.add_argument("--unit", type=float, default=0.2, help="Unit duration in seconds (default: 0.2)")
parser.add_argument("--lead-in", type=float, default=0.5, help="Silence before morse starts in seconds (default: 0.5)")
parser.add_argument("--volume", type=int, default=None, metavar="0-100", help="Speaker volume to set before transmitting")
args = parser.parse_args()
sequence = text_to_sequence(args.message, args.unit)
if not sequence:
print("No encodable characters in message.")
return
morse_str = " / ".join(
" ".join(MORSE[ch] for ch in word.upper() if ch in MORSE)
for word in args.message.split()
)
total = args.lead_in + sum(d for _, d in sequence)
print(f"Message : {args.message}")
print(f"Morse : {morse_str}")
print(f"Duration: {total:.1f}s (unit={args.unit}s)")
with httpx.Client(headers=HEADERS, verify=False) as client:
cameras = client.get(f"{BASE}/cameras").raise_for_status().json()
camera = next(c for c in cameras if CAMERA_NAME in c["name"])
camera_id = camera["id"]
original_led = camera["ledSettings"]["isEnabled"]
print(f"Camera : {camera['name']} ({camera_id})")
if args.volume is not None:
set_speaker_volume(camera_id, args.volume)
session = client.post(f"{BASE}/cameras/{camera_id}/talkback-session").raise_for_status().json()
rate = session["samplingRate"]
print(f"Talkback: {session['url']} ({session['codec']} {rate}Hz)")
pcm = generate_pcm(sequence, rate, args.lead_in)
tmp = tempfile.NamedTemporaryFile(suffix=".pcm", delete=False)
try:
tmp.write(pcm)
tmp.close()
ffmpeg = subprocess.Popen(
[
"ffmpeg", "-loglevel", "error",
"-re",
"-f", "s16le", "-ar", str(rate), "-ac", "1", "-i", tmp.name,
"-acodec", "libopus", "-b:a", "96k",
"-f", "rtp", session["url"],
],
)
stop = threading.Event()
blinker = threading.Thread(
target=led_worker,
args=(camera_id, sequence, args.lead_in, original_led, stop),
daemon=False,
)
print("Transmitting... (Ctrl+C to abort)")
blinker.start()
try:
ffmpeg.wait()
blinker.join()
except KeyboardInterrupt:
print("\nAborted.")
stop.set()
ffmpeg.terminate()
ffmpeg.wait()
blinker.join(timeout=3)
finally:
os.unlink(tmp.name)
if __name__ == "__main__":
main()