# /// script # requires-python = ">=3.11" # dependencies = ["httpx[http2]", "python-dotenv"] # /// """Transmit a message as Morse code on the camera speaker and status LED in sync.""" import argparse import array import math import os import subprocess import tempfile import threading import time import httpx from dotenv import load_dotenv load_dotenv() HOST = os.environ["HOST"] BASE = f"https://{HOST}/proxy/protect/integration/v1" PRIVATE_BASE = f"https://{HOST}/proxy/protect/api" HEADERS = {"X-API-Key": os.environ["API_KEY"]} USERNAME = os.environ["UNIFI_USERNAME"] PASSWORD = os.environ["UNIFI_PASSWORD"] CAMERA_NAME = "G6 Pro 360" TONE_FREQ = 700 # Hz MORSE = { 'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.', 'F': '..-.', 'G': '--.', 'H': '....', 'I': '..', 'J': '.---', 'K': '-.-', 'L': '.-..', 'M': '--', 'N': '-.', 'O': '---', 'P': '.--.', 'Q': '--.-', 'R': '.-.', 'S': '...', 'T': '-', 'U': '..-', 'V': '...-', 'W': '.--', 'X': '-..-', 'Y': '-.--', 'Z': '--..', '0': '-----', '1': '.----', '2': '..---', '3': '...--', '4': '....-', '5': '.....', '6': '-....', '7': '--...', '8': '---..', '9': '----.', '.': '.-.-.-', ',': '--..--', '?': '..--..', '/': '-..-.', '=': '-...-', } def set_speaker_volume(camera_id: str, volume: int): # Accept-Encoding must be suppressed — server returns 403 when present with httpx.Client(verify=False, http2=True) as client: res = client.post( f"https://{HOST}/api/auth/login", content=f'{{"username":"{USERNAME}","password":"{PASSWORD}","rememberMe":false}}'.encode(), headers={"Content-Type": "application/json", "Accept-Encoding": ""}, ) res.raise_for_status() csrf = res.headers.get("x-csrf-token") client.patch( f"{PRIVATE_BASE}/cameras/{camera_id}", headers={"x-csrf-token": csrf}, json={"speakerSettings": {"volume": volume, "speakerVolume": volume}}, ).raise_for_status() print(f"Volume : {volume}") def text_to_sequence(text: str, unit: float) -> list[tuple[bool, float]]: """Convert text to a list of (is_on, duration_seconds) pulses.""" seq: list[tuple[bool, float]] = [] words = text.upper().split() for wi, word in enumerate(words): letters = [ch for ch in word if ch in MORSE] for li, ch in enumerate(letters): code = MORSE[ch] for si, sym in enumerate(code): seq.append((True, unit if sym == '.' else 3 * unit)) if si < len(code) - 1: seq.append((False, unit)) # inter-symbol gap if li < len(letters) - 1: seq.append((False, 3 * unit)) # inter-letter gap if wi < len(words) - 1: seq.append((False, 7 * unit)) # inter-word gap return seq def generate_pcm(sequence: list[tuple[bool, float]], sample_rate: int, lead_in: float) -> bytes: """Generate signed 16-bit little-endian PCM for the morse sequence.""" FADE = int(0.005 * sample_rate) # 5ms linear fade in/out to avoid clicks samples: array.array = array.array('h') samples.extend([0] * int(lead_in * sample_rate)) t = 0 for is_on, duration in sequence: n = int(duration * sample_rate) if is_on: for i in range(n): if i < FADE: env = i / FADE elif i >= n - FADE: env = (n - i) / FADE else: env = 1.0 val = int(32767 * 0.8 * env * math.sin(2 * math.pi * TONE_FREQ * (t + i) / sample_rate)) samples.append(val) else: samples.extend([0] * n) t += n return samples.tobytes() def _sleep_interruptible(duration: float, stop: threading.Event): deadline = time.monotonic() + duration while not stop.is_set(): left = deadline - time.monotonic() if left <= 0: break time.sleep(min(left, 0.02)) def led_worker(camera_id: str, sequence: list[tuple[bool, float]], lead_in: float, original_led: bool, stop: threading.Event): """Toggle the LED in a dedicated thread with its own HTTP client.""" with httpx.Client(headers=HEADERS, verify=False) as client: def set_led(state: bool): client.patch( f"{BASE}/cameras/{camera_id}", json={"ledSettings": {"isEnabled": state}}, ).raise_for_status() try: _sleep_interruptible(lead_in, stop) for is_on, duration in sequence: if stop.is_set(): break t0 = time.monotonic() set_led(is_on) remaining = duration - (time.monotonic() - t0) _sleep_interruptible(remaining, stop) finally: set_led(original_led) def main(): parser = argparse.ArgumentParser(description="Morse code on camera speaker + LED") parser.add_argument("message", nargs="?", default="SOS", help="Message to transmit (default: SOS)") parser.add_argument("--unit", type=float, default=0.2, help="Unit duration in seconds (default: 0.2)") parser.add_argument("--lead-in", type=float, default=0.5, help="Silence before morse starts in seconds (default: 0.5)") parser.add_argument("--volume", type=int, default=None, metavar="0-100", help="Speaker volume to set before transmitting") args = parser.parse_args() sequence = text_to_sequence(args.message, args.unit) if not sequence: print("No encodable characters in message.") return morse_str = " / ".join( " ".join(MORSE[ch] for ch in word.upper() if ch in MORSE) for word in args.message.split() ) total = args.lead_in + sum(d for _, d in sequence) print(f"Message : {args.message}") print(f"Morse : {morse_str}") print(f"Duration: {total:.1f}s (unit={args.unit}s)") with httpx.Client(headers=HEADERS, verify=False) as client: cameras = client.get(f"{BASE}/cameras").raise_for_status().json() camera = next(c for c in cameras if CAMERA_NAME in c["name"]) camera_id = camera["id"] original_led = camera["ledSettings"]["isEnabled"] print(f"Camera : {camera['name']} ({camera_id})") if args.volume is not None: set_speaker_volume(camera_id, args.volume) session = client.post(f"{BASE}/cameras/{camera_id}/talkback-session").raise_for_status().json() rate = session["samplingRate"] print(f"Talkback: {session['url']} ({session['codec']} {rate}Hz)") pcm = generate_pcm(sequence, rate, args.lead_in) tmp = tempfile.NamedTemporaryFile(suffix=".pcm", delete=False) try: tmp.write(pcm) tmp.close() ffmpeg = subprocess.Popen( [ "ffmpeg", "-loglevel", "error", "-re", "-f", "s16le", "-ar", str(rate), "-ac", "1", "-i", tmp.name, "-acodec", "libopus", "-b:a", "64k", "-f", "rtp", session["url"], ], ) stop = threading.Event() blinker = threading.Thread( target=led_worker, args=(camera_id, sequence, args.lead_in, original_led, stop), daemon=False, ) print("Transmitting... (Ctrl+C to abort)") blinker.start() try: ffmpeg.wait() blinker.join() except KeyboardInterrupt: print("\nAborted.") stop.set() ffmpeg.terminate() ffmpeg.wait() blinker.join(timeout=3) finally: os.unlink(tmp.name) if __name__ == "__main__": main()