rewrite rotate to json
This commit is contained in:
		@@ -1,53 +1,74 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
import configparser
 | 
			
		||||
from typing import Optional
 | 
			
		||||
import subprocess
 | 
			
		||||
import requests
 | 
			
		||||
import re
 | 
			
		||||
import io
 | 
			
		||||
import pydantic
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Server(pydantic.BaseModel):
 | 
			
		||||
    public_key: str
 | 
			
		||||
    endpoint: str
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ServerConfig(pydantic.BaseModel):
 | 
			
		||||
    servers: list[Server]
 | 
			
		||||
    current: Server
 | 
			
		||||
    private_key: str
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def offline():
 | 
			
		||||
    try:
 | 
			
		||||
        requests.head("https://1.1.1.1", timeout=1)
 | 
			
		||||
    except Exception:
 | 
			
		||||
    except IOError:
 | 
			
		||||
        return True
 | 
			
		||||
    else:
 | 
			
		||||
        return False
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
def rotate_conf():
 | 
			
		||||
    iface = None
 | 
			
		||||
    peers = []
 | 
			
		||||
    try:
 | 
			
		||||
        with open("/snacks/wireguard/wg.conf", "r") as f:
 | 
			
		||||
            pattern = re.compile("\[[^\[\]]+\][^\[\]]+")
 | 
			
		||||
            sections = []
 | 
			
		||||
            for section in re.findall(pattern, f.read()):
 | 
			
		||||
                sections.append(section.strip())
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        print(e)
 | 
			
		||||
    else:
 | 
			
		||||
        for section in sections:
 | 
			
		||||
            config = configparser.ConfigParser()
 | 
			
		||||
            config.read_string(section)
 | 
			
		||||
            if "Peer" in config.sections():
 | 
			
		||||
                peers.append(config)
 | 
			
		||||
            else:
 | 
			
		||||
                iface = config
 | 
			
		||||
    buf = io.StringIO()
 | 
			
		||||
    try:
 | 
			
		||||
        iface.write(buf)
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        print(e)
 | 
			
		||||
    else:
 | 
			
		||||
        first = peers.pop(0)
 | 
			
		||||
        peers.append(first)
 | 
			
		||||
        for peer in peers:
 | 
			
		||||
            peer.write(buf)
 | 
			
		||||
        try:
 | 
			
		||||
            with open("/snacks/wireguard/wg.conf", "w") as f:
 | 
			
		||||
                f.write(buf.getvalue())
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            print(e)
 | 
			
		||||
 | 
			
		||||
if offline():
 | 
			
		||||
    rotate_conf()
 | 
			
		||||
    subprocess.run(["systemctl", "restart", "vpnclient-wg"])
 | 
			
		||||
def select_server() -> Optional[ServerConfig]:
 | 
			
		||||
    try:
 | 
			
		||||
        servers = ServerConfig.parse_file(
 | 
			
		||||
            "/snacks/wireguard/servers.json", encoding="utf-8"
 | 
			
		||||
        )
 | 
			
		||||
    except (IOError, pydantic.ValidationError):
 | 
			
		||||
        return None
 | 
			
		||||
    servers.servers.append(servers.current)
 | 
			
		||||
    servers.current = servers.servers.pop(0)
 | 
			
		||||
    return servers
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def write_json(servers: ServerConfig):
 | 
			
		||||
    try:
 | 
			
		||||
        with open("/snacks/wireguard/servers.json", mode="w", encoding="utf-8") as f:
 | 
			
		||||
            f.write(servers.json(indent=4))
 | 
			
		||||
    except IOError:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def write_wg(server: Server, private_key: str):
 | 
			
		||||
    wg_conf = (
 | 
			
		||||
        "[Interface]\n"
 | 
			
		||||
        f"privatekey = {private_key}\n\n"
 | 
			
		||||
        "[Peer]\n"
 | 
			
		||||
        f"publickey = {server.public_key}\n"
 | 
			
		||||
        f"endpoint = {server.endpoint}\n"
 | 
			
		||||
        "persistentkeepalive = 20\n"
 | 
			
		||||
        "allowedips = 0.0.0.0/0, ::/0\n"
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        with open("/snacks/wireguard/wg.conf", mode="w", encoding="utf-8") as f:
 | 
			
		||||
            f.write(wg_conf)
 | 
			
		||||
    except IOError:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    if offline() and (servers := select_server()):
 | 
			
		||||
        write_json(servers)
 | 
			
		||||
        write_wg(servers.current, servers.private_key)
 | 
			
		||||
        subprocess.run(["systemctl", "restart", "vpnclient-wg"], check=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    main()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user