diff --git a/scripts/rotate.py b/scripts/rotate.py index d72fa03..c8fa3a2 100755 --- a/scripts/rotate.py +++ b/scripts/rotate.py @@ -1,53 +1,74 @@ #!/usr/bin/env python3 -import configparser +from typing import Optional import subprocess import requests -import re -import io +import pydantic + + +class Server(pydantic.BaseModel): + public_key: str + endpoint: str + + +class ServerConfig(pydantic.BaseModel): + servers: list[Server] + current: Server + private_key: str + def offline(): try: requests.head("https://1.1.1.1", timeout=1) - except Exception: + except IOError: return True else: - return False + return True -def rotate_conf(): - iface = None - peers = [] - try: - with open("/snacks/wireguard/wg.conf", "r") as f: - pattern = re.compile("\[[^\[\]]+\][^\[\]]+") - sections = [] - for section in re.findall(pattern, f.read()): - sections.append(section.strip()) - except Exception as e: - print(e) - else: - for section in sections: - config = configparser.ConfigParser() - config.read_string(section) - if "Peer" in config.sections(): - peers.append(config) - else: - iface = config - buf = io.StringIO() - try: - iface.write(buf) - except Exception as e: - print(e) - else: - first = peers.pop(0) - peers.append(first) - for peer in peers: - peer.write(buf) - try: - with open("/snacks/wireguard/wg.conf", "w") as f: - f.write(buf.getvalue()) - except Exception as e: - print(e) -if offline(): - rotate_conf() - subprocess.run(["systemctl", "restart", "vpnclient-wg"]) +def select_server() -> Optional[ServerConfig]: + try: + servers = ServerConfig.parse_file( + "/snacks/wireguard/servers.json", encoding="utf-8" + ) + except (IOError, pydantic.ValidationError): + return None + servers.servers.append(servers.current) + servers.current = servers.servers.pop(0) + return servers + + +def write_json(servers: ServerConfig): + try: + with open("/snacks/wireguard/servers.json", mode="w", encoding="utf-8") as f: + f.write(servers.json(indent=4)) + except IOError: + pass + + +def write_wg(server: Server, private_key: str): + wg_conf = ( + "[Interface]\n" + f"privatekey = {private_key}\n\n" + "[Peer]\n" + f"publickey = {server.public_key}\n" + f"endpoint = {server.endpoint}\n" + "persistentkeepalive = 20\n" + "allowedips = 0.0.0.0/0, ::/0\n" + ) + + try: + with open("/snacks/wireguard/wg.conf", mode="w", encoding="utf-8") as f: + f.write(wg_conf) + except IOError: + pass + + +def main(): + if offline() and (servers := select_server()): + write_json(servers) + write_wg(servers.current, servers.private_key) + subprocess.run(["systemctl", "restart", "vpnclient-wg"], check=False) + + +if __name__ == "__main__": + main()