#! @python@/bin/python -B import argparse import base64 import json import os import shutil import signal import socket import subprocess import sys import time import tomllib import yaml CONF_FILE = '/etc/qclk/config.yaml' WG_KEY_FILE = '/etc/qclk/wg.key' IWD_PATH = '/var/lib/iwd' IWD_CONF = '''[Settings] Hidden={hidden} [Security] Passphrase={psk} ''' MANAGEMENT_NET = '''[Match] Name=management [Network] Address={ip}/32 ''' def log(m): print(m, file=sys.stderr) def iwd_ssid_basename(ssid: str) -> str: if ssid.isalnum() and not any(c in ssid for c in '-_ '): return ssid return f"={ssid.encode('utf-8').hex()}" class Configurer: tmpdir = '/run/qclk' def __init__(self, conf_file: str, wg_key_file: str): self.conf_file = conf_file self.load_config() with open(wg_key_file, 'rb') as f: output = subprocess.check_output(['@wireguardTools@/bin/wg', 'pubkey'], input=f.read()) pubkey = base64.b64decode(output) self.id = pubkey[:4].hex() self.load_dir = os.path.join(self.tmpdir, 'load') self.sd_sock = None os.makedirs(self.tmpdir, exist_ok=True) os.makedirs(self.load_dir, exist_ok=True) def load_config(self): with open(self.conf_file) as f: self.config = yaml.safe_load(f) def write_config(self): log(f'Updaing config') tmp = os.path.join(self.tmpdir, 'new-config.yaml') with open(tmp, 'w') as f: yaml.dump(self.config, f) shutil.move(tmp, self.conf_file) def _setup_hostname(self): hostname = f'qclk-{self.id}' log(f"Setting hostname to '{hostname}'") socket.sethostname(hostname) def _setup_wifi(self): os.makedirs(IWD_CONF, exist_ok=True) if 'wifi' in self.config: conf = self.config['wifi'] tmp = os.path.join(self.tmpdir, 'wifi.psk') with open(tmp, 'w') as f: print( IWD_CONF.format( hidden=str(conf['hidden']).lower(), psk=conf['password']), file=f) fname = f"{iwd_ssid_basename(conf['ssid'])}.psk" log(f"Writing IWD config file '{fname}' for network '{conf['ssid']}'") shutil.move(tmp, os.path.join(IWD_PATH, fname)) else: fname = None for f in os.listdir(IWD_PATH): if (fname is not None and f != fname) and f.endswith('.psk'): log(f"Cleaning up old IWD config '{f}'") os.remove(os.path.join(IWD_PATH, f)) subprocess.call(['@iwd@/bin/iwctl', 'station', 'wifi', 'scan']) def _setup_mgmt(self): conf = self.config['management'] tmp = os.path.join(self.tmpdir, 'management.network') with open(tmp, 'w') as f: print(MANAGEMENT_NET.format(ip=conf['ip']), file=f) log(f"Configuring management IP {conf['ip']}") os.makedirs('/run/systemd/network', exist_ok=True) shutil.move(tmp, '/run/systemd/network/20-management.network') subprocess.check_call(['@systemd@/bin/networkctl', 'reload']) def reconcile(self): if self.sd_sock is not None: self.sd_sock.sendall(b'STATUS=Reconciling configuration...') self._setup_hostname() self._setup_wifi() self._setup_mgmt() if self.sd_sock is not None: self.sd_sock.sendall(b'STATUS=OK\nREADY=1') def _ext_load(self, identifier: str, device: str): mountpoint = os.path.join(self.tmpdir, 'mnt', identifier) os.makedirs(mountpoint, exist_ok=True) try: log(f'Mounting {device} -> {mountpoint}') subprocess.check_call(['@utilLinux@/bin/mount', '-t', 'vfat', '-o', 'ro', device, mountpoint]) try: path = os.path.join(mountpoint, 'qclk.toml') if not os.path.exists(path): log(f'No config file found on {device}') return with open(path, 'rb') as f: ext_conf = tomllib.load(f) finally: subprocess.check_call(['@utilLinux@/bin/umount', mountpoint]) finally: os.rmdir(mountpoint) if 'wifi' in ext_conf: log(f'Loading WiFi settings from {device}') self.config['wifi'] = { 'ssid': ext_conf['wifi']['ssid'], 'password': ext_conf['wifi'].get('password', ''), 'hidden': ext_conf['wifi'].get('hidden', False), } self._setup_wifi() self.write_config() def serve(self, args: argparse.Namespace): os.makedirs(os.path.join(self.tmpdir, 'load'), exist_ok=True) addr = os.getenv('NOTIFY_SOCKET') if addr is not None: self.sd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) if addr[0] == '@': addr = '\0' + addr[1:] self.sd_sock.connect(addr) running = True def sighandler(sig, frame): nonlocal running if sig in (signal.SIGINT, signal.SIGTERM): running = False return if sig == signal.SIGHUP: if self.sd_sock is not None: t = time.clock_gettime_ns(time.CLOCK_MONOTONIC) // 1000 self.sd_sock.sendall(f'RELOADING=1\nMONOTONIC_USEC={t}'.encode('ascii')) self.load_config() self.reconcile() elif sig == signal.SIGUSR1: for identifier in os.listdir(self.load_dir): fname = os.path.join(self.load_dir, identifier) with open(fname) as f: device = f.read().strip() os.remove(fname) try: self._ext_load(identifier, device) except Exception as ex: log(f'Failed to load config from {device}: {ex}') for s in [signal.SIGINT, signal.SIGTERM, signal.SIGHUP, signal.SIGUSR1]: signal.signal(s, sighandler) self.reconcile() while running: signal.pause() if self.sd_sock is not None: self.sd_sock.close() def ext_load(self, args: argparse.Namespace): with open(os.path.join(self.load_dir, args.identifier), 'w') as f: print(args.device, file=f) output = subprocess.check_output( ['@systemd@/bin/systemctl', 'show', '--property', 'MainPID', '--value', 'qclk-configurer.service'], encoding='ascii') pid = int(output.strip()) os.kill(pid, signal.SIGUSR1) def main(): parser = argparse.ArgumentParser(description='qclkOS configurer') cmds = parser.add_subparsers(title='commands', help='reconcile') rec_parser = cmds.add_parser('serve') rec_parser.set_defaults(func=Configurer.serve) eload_parser = cmds.add_parser('ext-load') eload_parser.set_defaults(func=Configurer.ext_load) eload_parser.add_argument('identifier', help='Unique device name') eload_parser.add_argument('device', help='Block device path') args = parser.parse_args() c = Configurer(CONF_FILE, WG_KEY_FILE) if not hasattr(args, 'func'): c.serve(args) else: args.func(c, args) if __name__ == '__main__': main()