nixos/google-oslogin: improve mock server
some slightly better error handling for nonexistent users, less parsing of URLs and query strings by hand.
This commit is contained in:
parent
5506463c2f
commit
f38e45c2e0
@ -7,24 +7,27 @@ import hashlib
|
||||
import base64
|
||||
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from typing import Dict
|
||||
|
||||
SNAKEOIL_PUBLIC_KEY = os.environ['SNAKEOIL_PUBLIC_KEY']
|
||||
|
||||
|
||||
def w(msg):
|
||||
def w(msg: bytes):
|
||||
sys.stderr.write(f"{msg}\n")
|
||||
sys.stderr.flush()
|
||||
|
||||
|
||||
def gen_fingerprint(pubkey):
|
||||
def gen_fingerprint(pubkey: str):
|
||||
decoded_key = base64.b64decode(pubkey.encode("ascii").split()[1])
|
||||
return hashlib.sha256(decoded_key).hexdigest()
|
||||
|
||||
def gen_email(username):
|
||||
|
||||
def gen_email(username: str):
|
||||
"""username seems to be a 21 characters long number string, so mimic that in a reproducible way"""
|
||||
return str(int(hashlib.sha256(username.encode()).hexdigest(), 16))[0:21]
|
||||
|
||||
|
||||
def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoil_pubkey: str) -> Dict:
|
||||
snakeoil_pubkey_fingerprint = gen_fingerprint(snakeoil_pubkey)
|
||||
# seems to be a 21 characters long numberstring, so mimic that in a reproducible way
|
||||
@ -56,7 +59,8 @@ def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoi
|
||||
|
||||
|
||||
class ReqHandler(BaseHTTPRequestHandler):
|
||||
def _send_json_ok(self, data):
|
||||
|
||||
def _send_json_ok(self, data: dict):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
@ -64,29 +68,62 @@ class ReqHandler(BaseHTTPRequestHandler):
|
||||
w(out)
|
||||
self.wfile.write(out)
|
||||
|
||||
def _send_json_success(self, success=True):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
out = json.dumps({"success": success}).encode()
|
||||
w(out)
|
||||
self.wfile.write(out)
|
||||
|
||||
def _send_404(self):
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
|
||||
def do_GET(self):
|
||||
p = str(self.path)
|
||||
# mockuser and mockadmin are allowed to login, both use the same snakeoil public key
|
||||
if p == '/computeMetadata/v1/oslogin/users?username=mockuser' \
|
||||
or p == '/computeMetadata/v1/oslogin/users?uid=1009719690':
|
||||
self._send_json_ok(gen_mockuser(username='mockuser', uid='1009719690', gid='1009719690',
|
||||
home_directory='/home/mockuser', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY))
|
||||
elif p == '/computeMetadata/v1/oslogin/users?username=mockadmin' \
|
||||
or p == '/computeMetadata/v1/oslogin/users?uid=1009719691':
|
||||
self._send_json_ok(gen_mockuser(username='mockadmin', uid='1009719691', gid='1009719691',
|
||||
home_directory='/home/mockadmin', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY))
|
||||
pu = urlparse(p)
|
||||
params = parse_qs(pu.query)
|
||||
|
||||
# mockuser is allowed to login
|
||||
elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockuser')}&policy=login":
|
||||
self._send_json_ok({'success': True})
|
||||
# users endpoint
|
||||
if pu.path == "/computeMetadata/v1/oslogin/users":
|
||||
# mockuser and mockadmin are allowed to login, both use the same snakeoil public key
|
||||
if params.get('username') == ['mockuser'] or params.get('uid') == ["1009719690"]:
|
||||
username = "mockuser"
|
||||
uid = "1009719690"
|
||||
elif params.get('username') == ['mockadmin'] or params.get('uid') == ["1009719691"]:
|
||||
username = "mockadmin"
|
||||
uid = "1009719691"
|
||||
else:
|
||||
self._send_404()
|
||||
return
|
||||
|
||||
# mockadmin may also become root
|
||||
elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=login" or p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=adminLogin":
|
||||
self._send_json_ok({'success': True})
|
||||
self._send_json_ok(gen_mockuser(username=username, uid=uid, gid=uid, home_directory=f"/home/{username}", snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY))
|
||||
return
|
||||
|
||||
# authorize endpoint
|
||||
elif pu.path == "/computeMetadata/v1/oslogin/authorize":
|
||||
# is user allowed to login?
|
||||
if params.get("policy") == ["login"]:
|
||||
# mockuser and mockadmin are allowed to login
|
||||
if params.get('email') == [gen_email("mockuser")] or params.get('email') == [gen_email("mockadmin")]:
|
||||
self._send_json_success()
|
||||
return
|
||||
self._send_json_success(False)
|
||||
return
|
||||
# is user allowed to become root?
|
||||
elif params.get("policy") == ["adminLogin"]:
|
||||
# only mockadmin is allowed to become admin
|
||||
self._send_json_success((params['email'] == [gen_email("mockadmin")]))
|
||||
return
|
||||
# send 404 for other policies
|
||||
else:
|
||||
self._send_404()
|
||||
return
|
||||
else:
|
||||
sys.stderr.write(f"Unhandled path: {p}\n")
|
||||
sys.stderr.flush()
|
||||
self.send_response(501)
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
self.wfile.write(b'')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user