From 06fe74975bab053d1489c6c8f063d241b5e819df Mon Sep 17 00:00:00 2001 From: mnerv <24420859+mnerv@users.noreply.github.com> Date: Sun, 1 Mar 2026 06:16:33 +0100 Subject: [PATCH] add LED blink and Morse code scripts --- README.md | 32 +++++++- blink_led.py | 54 +++++++++++++ morse.py | 213 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 blink_led.py create mode 100644 morse.py diff --git a/README.md b/README.md index 2d3d3b0..0efd6b1 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,33 @@ wrapping the private UniFi Protect API. ## Scripts ```sh -uv run play_speaker.py # play hello.wav through the camera speaker -uv run record_mic.py # record from the camera mic (Ctrl+C to stop) -uv run dump_camera.py # dump camera data from integration + private API +uv run dump_camera.py # dump camera data from integration + private API +uv run play_speaker.py # play hello.wav through the camera speaker +uv run record_mic.py # record from the camera mic (Ctrl+C to stop) +uv run blink_led.py # blink the status LED +uv run morse.py "HELLO WORLD" # transmit Morse code on speaker + LED in sync ``` + +### `blink_led.py` + +Toggles the status LED on and off, then restores its original state on exit. + +```sh +uv run blink_led.py # 10 blinks at 0.5s interval (default) +uv run blink_led.py --count 20 --interval 0.3 +``` + +### `morse.py` + +Encodes a message as Morse code, plays a 700 Hz tone through the talkback speaker, +and blinks the status LED in sync. + +```sh +uv run morse.py "SOS" # default: SOS at 0.2s unit +uv run morse.py "BACK HOME" --unit 0.25 # slower speed +uv run morse.py "SOS" --volume 80 # set speaker volume first (0-100) +uv run morse.py "SOS" --lead-in 0.8 # longer silence before morse starts +``` + +Standard Morse timing: dot = 1 unit, dash = 3 units, symbol gap = 1 unit, +letter gap = 3 units, word gap = 7 units. diff --git a/blink_led.py b/blink_led.py new file mode 100644 index 0000000..cb0d3e9 --- /dev/null +++ b/blink_led.py @@ -0,0 +1,54 @@ +import argparse +import os +import time + +import httpx +from dotenv import load_dotenv + +load_dotenv() + +HOST = os.environ["HOST"] +BASE = f"https://{HOST}/proxy/protect/integration/v1" +HEADERS = {"X-API-Key": os.environ["API_KEY"]} +CAMERA_NAME = "G6 Pro 360" + + +def set_led(client: httpx.Client, camera_id: str, enabled: bool): + client.patch( + f"{BASE}/cameras/{camera_id}", + json={"ledSettings": {"isEnabled": enabled}}, + ).raise_for_status() + + +def main(): + parser = argparse.ArgumentParser(description="Blink the G6 Pro 360 status LED") + parser.add_argument("--count", type=int, default=10, help="Number of blink cycles (default: 10)") + parser.add_argument("--interval", type=float, default=0.5, help="Seconds per on/off half-cycle (default: 0.5)") + args = parser.parse_args() + + with httpx.Client(headers=HEADERS, verify=False) as client: + cameras = client.get(f"{BASE}/cameras").raise_for_status().json() + camera = next(c for c in cameras if CAMERA_NAME in c["name"]) + camera_id = camera["id"] + original_led = camera["ledSettings"]["isEnabled"] + print(f"Camera: {camera['name']} ({camera_id})") + print(f"LED was: {'on' if original_led else 'off'}") + print(f"Blinking {args.count} times, {args.interval}s interval — Ctrl+C to stop early") + + try: + for i in range(args.count): + set_led(client, camera_id, True) + print(f" [{i+1}/{args.count}] ON", flush=True) + time.sleep(args.interval) + set_led(client, camera_id, False) + print(f" [{i+1}/{args.count}] OFF", flush=True) + time.sleep(args.interval) + except KeyboardInterrupt: + print("\nInterrupted.") + finally: + set_led(client, camera_id, original_led) + print(f"Restored LED to: {'on' if original_led else 'off'}") + + +if __name__ == "__main__": + main() diff --git a/morse.py b/morse.py new file mode 100644 index 0000000..5a3e956 --- /dev/null +++ b/morse.py @@ -0,0 +1,213 @@ +# /// script +# requires-python = ">=3.11" +# dependencies = ["httpx[http2]", "python-dotenv"] +# /// +"""Transmit a message as Morse code on the camera speaker and status LED in sync.""" + +import argparse +import array +import math +import os +import subprocess +import tempfile +import threading +import time + +import httpx +from dotenv import load_dotenv + +load_dotenv() + +HOST = os.environ["HOST"] +BASE = f"https://{HOST}/proxy/protect/integration/v1" +PRIVATE_BASE = f"https://{HOST}/proxy/protect/api" +HEADERS = {"X-API-Key": os.environ["API_KEY"]} +USERNAME = os.environ["UNIFI_USERNAME"] +PASSWORD = os.environ["UNIFI_PASSWORD"] +CAMERA_NAME = "G6 Pro 360" + +TONE_FREQ = 700 # Hz + +MORSE = { + 'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.', + 'F': '..-.', 'G': '--.', 'H': '....', 'I': '..', 'J': '.---', + 'K': '-.-', 'L': '.-..', 'M': '--', 'N': '-.', 'O': '---', + 'P': '.--.', 'Q': '--.-', 'R': '.-.', 'S': '...', 'T': '-', + 'U': '..-', 'V': '...-', 'W': '.--', 'X': '-..-', 'Y': '-.--', + 'Z': '--..', + '0': '-----', '1': '.----', '2': '..---', '3': '...--', '4': '....-', + '5': '.....', '6': '-....', '7': '--...', '8': '---..', '9': '----.', + '.': '.-.-.-', ',': '--..--', '?': '..--..', '/': '-..-.', '=': '-...-', +} + + +def set_speaker_volume(camera_id: str, volume: int): + # Accept-Encoding must be suppressed — server returns 403 when present + with httpx.Client(verify=False, http2=True) as client: + res = client.post( + f"https://{HOST}/api/auth/login", + content=f'{{"username":"{USERNAME}","password":"{PASSWORD}","rememberMe":false}}'.encode(), + headers={"Content-Type": "application/json", "Accept-Encoding": ""}, + ) + res.raise_for_status() + csrf = res.headers.get("x-csrf-token") + client.patch( + f"{PRIVATE_BASE}/cameras/{camera_id}", + headers={"x-csrf-token": csrf}, + json={"speakerSettings": {"volume": volume, "speakerVolume": volume}}, + ).raise_for_status() + print(f"Volume : {volume}") + + +def text_to_sequence(text: str, unit: float) -> list[tuple[bool, float]]: + """Convert text to a list of (is_on, duration_seconds) pulses.""" + seq: list[tuple[bool, float]] = [] + words = text.upper().split() + for wi, word in enumerate(words): + letters = [ch for ch in word if ch in MORSE] + for li, ch in enumerate(letters): + code = MORSE[ch] + for si, sym in enumerate(code): + seq.append((True, unit if sym == '.' else 3 * unit)) + if si < len(code) - 1: + seq.append((False, unit)) # inter-symbol gap + if li < len(letters) - 1: + seq.append((False, 3 * unit)) # inter-letter gap + if wi < len(words) - 1: + seq.append((False, 7 * unit)) # inter-word gap + return seq + + +def generate_pcm(sequence: list[tuple[bool, float]], sample_rate: int, lead_in: float) -> bytes: + """Generate signed 16-bit little-endian PCM for the morse sequence.""" + FADE = int(0.005 * sample_rate) # 5ms linear fade in/out to avoid clicks + samples: array.array = array.array('h') + samples.extend([0] * int(lead_in * sample_rate)) + t = 0 + for is_on, duration in sequence: + n = int(duration * sample_rate) + if is_on: + for i in range(n): + if i < FADE: + env = i / FADE + elif i >= n - FADE: + env = (n - i) / FADE + else: + env = 1.0 + val = int(32767 * 0.8 * env * math.sin(2 * math.pi * TONE_FREQ * (t + i) / sample_rate)) + samples.append(val) + else: + samples.extend([0] * n) + t += n + return samples.tobytes() + + +def _sleep_interruptible(duration: float, stop: threading.Event): + deadline = time.monotonic() + duration + while not stop.is_set(): + left = deadline - time.monotonic() + if left <= 0: + break + time.sleep(min(left, 0.02)) + + +def led_worker(camera_id: str, sequence: list[tuple[bool, float]], lead_in: float, original_led: bool, stop: threading.Event): + """Toggle the LED in a dedicated thread with its own HTTP client.""" + with httpx.Client(headers=HEADERS, verify=False) as client: + def set_led(state: bool): + client.patch( + f"{BASE}/cameras/{camera_id}", + json={"ledSettings": {"isEnabled": state}}, + ).raise_for_status() + + try: + _sleep_interruptible(lead_in, stop) + for is_on, duration in sequence: + if stop.is_set(): + break + t0 = time.monotonic() + set_led(is_on) + remaining = duration - (time.monotonic() - t0) + _sleep_interruptible(remaining, stop) + finally: + set_led(original_led) + + +def main(): + parser = argparse.ArgumentParser(description="Morse code on camera speaker + LED") + parser.add_argument("message", nargs="?", default="SOS", help="Message to transmit (default: SOS)") + parser.add_argument("--unit", type=float, default=0.2, help="Unit duration in seconds (default: 0.2)") + parser.add_argument("--lead-in", type=float, default=0.5, help="Silence before morse starts in seconds (default: 0.5)") + parser.add_argument("--volume", type=int, default=None, metavar="0-100", help="Speaker volume to set before transmitting") + args = parser.parse_args() + + sequence = text_to_sequence(args.message, args.unit) + if not sequence: + print("No encodable characters in message.") + return + + morse_str = " / ".join( + " ".join(MORSE[ch] for ch in word.upper() if ch in MORSE) + for word in args.message.split() + ) + total = args.lead_in + sum(d for _, d in sequence) + print(f"Message : {args.message}") + print(f"Morse : {morse_str}") + print(f"Duration: {total:.1f}s (unit={args.unit}s)") + + with httpx.Client(headers=HEADERS, verify=False) as client: + cameras = client.get(f"{BASE}/cameras").raise_for_status().json() + camera = next(c for c in cameras if CAMERA_NAME in c["name"]) + camera_id = camera["id"] + original_led = camera["ledSettings"]["isEnabled"] + print(f"Camera : {camera['name']} ({camera_id})") + + if args.volume is not None: + set_speaker_volume(camera_id, args.volume) + + session = client.post(f"{BASE}/cameras/{camera_id}/talkback-session").raise_for_status().json() + rate = session["samplingRate"] + print(f"Talkback: {session['url']} ({session['codec']} {rate}Hz)") + + pcm = generate_pcm(sequence, rate, args.lead_in) + + tmp = tempfile.NamedTemporaryFile(suffix=".pcm", delete=False) + try: + tmp.write(pcm) + tmp.close() + + ffmpeg = subprocess.Popen( + [ + "ffmpeg", "-loglevel", "error", + "-re", + "-f", "s16le", "-ar", str(rate), "-ac", "1", "-i", tmp.name, + "-acodec", "libopus", "-b:a", "64k", + "-f", "rtp", session["url"], + ], + ) + + stop = threading.Event() + blinker = threading.Thread( + target=led_worker, + args=(camera_id, sequence, args.lead_in, original_led, stop), + daemon=False, + ) + + print("Transmitting... (Ctrl+C to abort)") + blinker.start() + + try: + ffmpeg.wait() + blinker.join() + except KeyboardInterrupt: + print("\nAborted.") + stop.set() + ffmpeg.terminate() + ffmpeg.wait() + blinker.join(timeout=3) + finally: + os.unlink(tmp.name) + + +if __name__ == "__main__": + main()