#! @python3@/bin/python3 -B # Based on `nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py` import argparse import datetime import glob import os import os.path import shutil import subprocess import sys import json from typing import NamedTuple, Dict, List from dataclasses import dataclass BOOT_MOUNT_POINT = '/boot' STORE_DIR = 'nix' # These values will be replaced with actual values during the package build BOOTSPEC_TOOLS = '@bootspecTools@' NIX = '@nix@' DISTRO_NAME = '@distroName@' SYSTEM_NAME = '@systemName@' CONFIGURATION_LIMIT = int('@configurationLimit@') CHECK_MOUNTPOINTS = "@checkMountpoints@" @dataclass class BootSpec: init: str initrd: str kernel: str kernelParams: List[str] label: str system: str toplevel: str specialisations: Dict[str, 'BootSpec'] sortKey: str initrdSecrets: str | None = None class SystemIdentifier(NamedTuple): profile: str | None generation: int specialisation: str | None def copy_if_not_exists(source: str, dest: str) -> None: if not os.path.exists(dest): shutil.copyfile(source, dest) def generation_dir(profile: str | None, generation: int) -> str: if profile: return f'/nix/var/nix/profiles/system-profiles/{profile}-{generation}-link' else: return f'/nix/var/nix/profiles/system-{generation}-link' def system_dir(i: SystemIdentifier) -> str: d = generation_dir(i.profile, i.generation) if i.specialisation: return os.path.join(d, 'specialisation', i.specialisation) else: return d def entry_key(i: SystemIdentifier) -> str: pieces = [ 'nixos', i.profile or None, 'generation', str(i.generation), f'specialisation-{i.specialisation}' if i.specialisation else None, ] return '-'.join(p for p in pieces if p) def bootspec_from_json(bootspec_json: Dict) -> BootSpec: specialisations = bootspec_json['org.nixos.specialisation.v1'] specialisations = {k: bootspec_from_json(v) for k, v in specialisations.items()} systemdBootExtension = bootspec_json.get('org.nixos.systemd-boot', {}) sortKey = systemdBootExtension.get('sortKey', 'nixos') return BootSpec( **bootspec_json['org.nixos.bootspec.v1'], specialisations=specialisations, sortKey=sortKey ) bootspecs = {} def get_bootspec(profile: str | None, generation: int) -> BootSpec: k = (profile, generation) if k in bootspecs: return bootspecs[k] system_directory = system_dir(SystemIdentifier(profile, generation, None)) boot_json_path = os.path.realpath(f'{system_directory}/boot.json') if os.path.isfile(boot_json_path): boot_json_f = open(boot_json_path, 'r') bootspec_json = json.load(boot_json_f) else: boot_json_str = subprocess.check_output([ f'{BOOTSPEC_TOOLS}/bin/synthesize', '--version', '1', system_directory, '/dev/stdout', ], universal_newlines=True) bootspec_json = json.loads(boot_json_str) bs = bootspec_from_json(bootspec_json) bootspecs[k] = bs return bs def copy_from_file(file: str, dry_run: bool = False) -> str: store_file_path = os.path.realpath(file) suffix = os.path.basename(store_file_path) store_dir = os.path.basename(os.path.dirname(store_file_path)) dst_path = f'/{STORE_DIR}/{store_dir}-{suffix}' if not dry_run: copy_if_not_exists(store_file_path, f'{BOOT_MOUNT_POINT}{dst_path}') return dst_path MENU_ITEM = 'item {gen_key} {title} Generation {generation} {description}' BOOT_ENTRY = ''':{gen_key} kernel ${{server}}/systems/{system_name}{kernel} {kernel_params} boothost=${{boothost}} initrd ${{server}}/systems/{system_name}{initrd} boot ''' def gen_entry(i: SystemIdentifier) -> (str, str): bootspec = get_bootspec(i.profile, i.generation) if i.specialisation: bootspec = bootspec.specialisations[i.specialisation] kernel = copy_from_file(bootspec.kernel) initrd = copy_from_file(bootspec.initrd) gen_key = entry_key(i) title = '{name}{profile}{specialisation}'.format( name=DISTRO_NAME, profile=' [' + i.profile + ']' if i.profile else '', specialisation=f' ({i.specialisation})' if i.specialisation else '') kernel_params = f'init={bootspec.init} ' kernel_params = kernel_params + ' '.join(bootspec.kernelParams) build_time = int(os.path.getctime(system_dir(i))) build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F') return MENU_ITEM.format( gen_key=gen_key, title=title, description=f'{bootspec.label}, built on {build_date}', generation=i.generation, ), BOOT_ENTRY.format( gen_key=gen_key, generation=i.generation, system_name=SYSTEM_NAME, kernel=kernel, kernel_params=kernel_params, initrd=initrd, ) def get_generations(profile: str | None = None) -> list[SystemIdentifier]: gen_list = subprocess.check_output([ f'{NIX}/bin/nix-env', '--list-generations', '-p', '/nix/var/nix/profiles/' + ('system-profiles/' + profile if profile else 'system')], universal_newlines=True) gen_lines = gen_list.split('\n') gen_lines.pop() configurationLimit = CONFIGURATION_LIMIT configurations = [ SystemIdentifier( profile=profile, generation=int(line.split()[0]), specialisation=None ) for line in gen_lines ] return configurations[-configurationLimit:] def remove_old_files(gens: list[SystemIdentifier]) -> None: known_paths = [] for gen in gens: bootspec = get_bootspec(gen.profile, gen.generation) known_paths.append(copy_from_file(bootspec.kernel, True)) known_paths.append(copy_from_file(bootspec.initrd, True)) for path in glob.iglob(f'{BOOT_MOUNT_POINT}/{STORE_DIR}/*'): if not path in known_paths and not os.path.isdir(path): os.unlink(path) def get_profiles() -> list[str]: if os.path.isdir('/nix/var/nix/profiles/system-profiles/'): return [x for x in os.listdir('/nix/var/nix/profiles/system-profiles/') if not x.endswith('-link')] else: return [] MENU = '''#!ipxe # Server hostname option set boothost ${{66:string}} set server http://${{boothost}} :start menu {distro} boot menu item --gap -- Generations {generation_items} item --gap -- Other item --key m main Main netboot menu choose --timeout 5000 --default {menu_default} selected || goto cancel goto ${{selected}} :cancel shell goto start :error echo Booting failed, dropping to shell shell goto start :main chain ${{server}}/boot.ipxe || goto error ''' def write_menu(gens: list[SystemIdentifier], default: SystemIdentifier) -> None: gen_menu_items = [] gen_cmds = [] for g in gens: bootspec = get_bootspec(g.profile, g.generation) specialisations = [ SystemIdentifier(profile=g.profile, generation=g.generation, specialisation=s) for s in bootspec.specialisations] for i in [g] + specialisations: mi, cmds = gen_entry(i) gen_menu_items.append(mi) gen_cmds.append(cmds) menu_file = f'{BOOT_MOUNT_POINT}/menu.ipxe' with open(f'{menu_file}.tmp', 'w') as f: f.write(MENU.format( distro=DISTRO_NAME, generation_items='\n'.join(gen_menu_items), menu_default=entry_key(default), )) print(file=f) print('\n\n'.join(gen_cmds), file=f) os.rename(f'{menu_file}.tmp', menu_file) def install_bootloader(args: argparse.Namespace) -> None: os.makedirs(f'{BOOT_MOUNT_POINT}/{STORE_DIR}', exist_ok=True) gens = get_generations() for profile in get_profiles(): gens += get_generations(profile) gens = sorted(gens, key=lambda g: entry_key(g), reverse=True) remove_old_files(gens) for g in gens: if os.path.dirname(get_bootspec(g.profile, g.generation).init) == os.path.realpath(args.default_config): default = g break else: assert False, 'No default generation found' write_menu(gens, default) def main() -> None: parser = argparse.ArgumentParser(description=f'Update {DISTRO_NAME}-related netboot files') parser.add_argument('default_config', metavar='DEFAULT-CONFIG', help=f'The default {DISTRO_NAME} config to boot') args = parser.parse_args() subprocess.check_call(CHECK_MOUNTPOINTS) install_bootloader(args) if __name__ == '__main__': main()