214 lines
7.6 KiB
Python
214 lines
7.6 KiB
Python
# /// script
|
|
# requires-python = ">=3.11"
|
|
# dependencies = ["httpx[http2]", "python-dotenv"]
|
|
# ///
|
|
"""Transmit a message as Morse code on the camera speaker and status LED in sync."""
|
|
|
|
import argparse
|
|
import array
|
|
import math
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
HOST = os.environ["HOST"]
|
|
BASE = f"https://{HOST}/proxy/protect/integration/v1"
|
|
PRIVATE_BASE = f"https://{HOST}/proxy/protect/api"
|
|
HEADERS = {"X-API-Key": os.environ["API_KEY"]}
|
|
USERNAME = os.environ["UNIFI_USERNAME"]
|
|
PASSWORD = os.environ["UNIFI_PASSWORD"]
|
|
CAMERA_NAME = "G6 Pro 360"
|
|
|
|
TONE_FREQ = 700 # Hz
|
|
|
|
MORSE = {
|
|
'A': '.-', 'B': '-...', 'C': '-.-.', 'D': '-..', 'E': '.',
|
|
'F': '..-.', 'G': '--.', 'H': '....', 'I': '..', 'J': '.---',
|
|
'K': '-.-', 'L': '.-..', 'M': '--', 'N': '-.', 'O': '---',
|
|
'P': '.--.', 'Q': '--.-', 'R': '.-.', 'S': '...', 'T': '-',
|
|
'U': '..-', 'V': '...-', 'W': '.--', 'X': '-..-', 'Y': '-.--',
|
|
'Z': '--..',
|
|
'0': '-----', '1': '.----', '2': '..---', '3': '...--', '4': '....-',
|
|
'5': '.....', '6': '-....', '7': '--...', '8': '---..', '9': '----.',
|
|
'.': '.-.-.-', ',': '--..--', '?': '..--..', '/': '-..-.', '=': '-...-',
|
|
}
|
|
|
|
|
|
def set_speaker_volume(camera_id: str, volume: int):
|
|
# Accept-Encoding must be suppressed — server returns 403 when present
|
|
with httpx.Client(verify=False, http2=True) as client:
|
|
res = client.post(
|
|
f"https://{HOST}/api/auth/login",
|
|
content=f'{{"username":"{USERNAME}","password":"{PASSWORD}","rememberMe":false}}'.encode(),
|
|
headers={"Content-Type": "application/json", "Accept-Encoding": ""},
|
|
)
|
|
res.raise_for_status()
|
|
csrf = res.headers.get("x-csrf-token")
|
|
client.patch(
|
|
f"{PRIVATE_BASE}/cameras/{camera_id}",
|
|
headers={"x-csrf-token": csrf},
|
|
json={"speakerSettings": {"volume": volume, "speakerVolume": volume}},
|
|
).raise_for_status()
|
|
print(f"Volume : {volume}")
|
|
|
|
|
|
def text_to_sequence(text: str, unit: float) -> list[tuple[bool, float]]:
|
|
"""Convert text to a list of (is_on, duration_seconds) pulses."""
|
|
seq: list[tuple[bool, float]] = []
|
|
words = text.upper().split()
|
|
for wi, word in enumerate(words):
|
|
letters = [ch for ch in word if ch in MORSE]
|
|
for li, ch in enumerate(letters):
|
|
code = MORSE[ch]
|
|
for si, sym in enumerate(code):
|
|
seq.append((True, unit if sym == '.' else 3 * unit))
|
|
if si < len(code) - 1:
|
|
seq.append((False, unit)) # inter-symbol gap
|
|
if li < len(letters) - 1:
|
|
seq.append((False, 3 * unit)) # inter-letter gap
|
|
if wi < len(words) - 1:
|
|
seq.append((False, 7 * unit)) # inter-word gap
|
|
return seq
|
|
|
|
|
|
def generate_pcm(sequence: list[tuple[bool, float]], sample_rate: int, lead_in: float) -> bytes:
|
|
"""Generate signed 16-bit little-endian PCM for the morse sequence."""
|
|
FADE = int(0.005 * sample_rate) # 5ms linear fade in/out to avoid clicks
|
|
samples: array.array = array.array('h')
|
|
samples.extend([0] * int(lead_in * sample_rate))
|
|
t = 0
|
|
for is_on, duration in sequence:
|
|
n = int(duration * sample_rate)
|
|
if is_on:
|
|
for i in range(n):
|
|
if i < FADE:
|
|
env = i / FADE
|
|
elif i >= n - FADE:
|
|
env = (n - i) / FADE
|
|
else:
|
|
env = 1.0
|
|
val = int(32767 * 0.8 * env * math.sin(2 * math.pi * TONE_FREQ * (t + i) / sample_rate))
|
|
samples.append(val)
|
|
else:
|
|
samples.extend([0] * n)
|
|
t += n
|
|
return samples.tobytes()
|
|
|
|
|
|
def _sleep_interruptible(duration: float, stop: threading.Event):
|
|
deadline = time.monotonic() + duration
|
|
while not stop.is_set():
|
|
left = deadline - time.monotonic()
|
|
if left <= 0:
|
|
break
|
|
time.sleep(min(left, 0.02))
|
|
|
|
|
|
def led_worker(camera_id: str, sequence: list[tuple[bool, float]], lead_in: float, original_led: bool, stop: threading.Event):
|
|
"""Toggle the LED in a dedicated thread with its own HTTP client."""
|
|
with httpx.Client(headers=HEADERS, verify=False) as client:
|
|
def set_led(state: bool):
|
|
client.patch(
|
|
f"{BASE}/cameras/{camera_id}",
|
|
json={"ledSettings": {"isEnabled": state}},
|
|
).raise_for_status()
|
|
|
|
try:
|
|
_sleep_interruptible(lead_in, stop)
|
|
for is_on, duration in sequence:
|
|
if stop.is_set():
|
|
break
|
|
t0 = time.monotonic()
|
|
set_led(is_on)
|
|
remaining = duration - (time.monotonic() - t0)
|
|
_sleep_interruptible(remaining, stop)
|
|
finally:
|
|
set_led(original_led)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Morse code on camera speaker + LED")
|
|
parser.add_argument("message", nargs="?", default="SOS", help="Message to transmit (default: SOS)")
|
|
parser.add_argument("--unit", type=float, default=0.2, help="Unit duration in seconds (default: 0.2)")
|
|
parser.add_argument("--lead-in", type=float, default=0.5, help="Silence before morse starts in seconds (default: 0.5)")
|
|
parser.add_argument("--volume", type=int, default=None, metavar="0-100", help="Speaker volume to set before transmitting")
|
|
args = parser.parse_args()
|
|
|
|
sequence = text_to_sequence(args.message, args.unit)
|
|
if not sequence:
|
|
print("No encodable characters in message.")
|
|
return
|
|
|
|
morse_str = " / ".join(
|
|
" ".join(MORSE[ch] for ch in word.upper() if ch in MORSE)
|
|
for word in args.message.split()
|
|
)
|
|
total = args.lead_in + sum(d for _, d in sequence)
|
|
print(f"Message : {args.message}")
|
|
print(f"Morse : {morse_str}")
|
|
print(f"Duration: {total:.1f}s (unit={args.unit}s)")
|
|
|
|
with httpx.Client(headers=HEADERS, verify=False) as client:
|
|
cameras = client.get(f"{BASE}/cameras").raise_for_status().json()
|
|
camera = next(c for c in cameras if CAMERA_NAME in c["name"])
|
|
camera_id = camera["id"]
|
|
original_led = camera["ledSettings"]["isEnabled"]
|
|
print(f"Camera : {camera['name']} ({camera_id})")
|
|
|
|
if args.volume is not None:
|
|
set_speaker_volume(camera_id, args.volume)
|
|
|
|
session = client.post(f"{BASE}/cameras/{camera_id}/talkback-session").raise_for_status().json()
|
|
rate = session["samplingRate"]
|
|
print(f"Talkback: {session['url']} ({session['codec']} {rate}Hz)")
|
|
|
|
pcm = generate_pcm(sequence, rate, args.lead_in)
|
|
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".pcm", delete=False)
|
|
try:
|
|
tmp.write(pcm)
|
|
tmp.close()
|
|
|
|
ffmpeg = subprocess.Popen(
|
|
[
|
|
"ffmpeg", "-loglevel", "error",
|
|
"-re",
|
|
"-f", "s16le", "-ar", str(rate), "-ac", "1", "-i", tmp.name,
|
|
"-acodec", "libopus", "-b:a", "96k",
|
|
"-f", "rtp", session["url"],
|
|
],
|
|
)
|
|
|
|
stop = threading.Event()
|
|
blinker = threading.Thread(
|
|
target=led_worker,
|
|
args=(camera_id, sequence, args.lead_in, original_led, stop),
|
|
daemon=False,
|
|
)
|
|
|
|
print("Transmitting... (Ctrl+C to abort)")
|
|
blinker.start()
|
|
|
|
try:
|
|
ffmpeg.wait()
|
|
blinker.join()
|
|
except KeyboardInterrupt:
|
|
print("\nAborted.")
|
|
stop.set()
|
|
ffmpeg.terminate()
|
|
ffmpeg.wait()
|
|
blinker.join(timeout=3)
|
|
finally:
|
|
os.unlink(tmp.name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|