侧边栏壁纸
博主头像
JasonLiu博主等级

Hi~ I'm Jason, Welcome

  • 累计撰写 31 篇文章
  • 累计创建 31 个标签
  • 累计收到 3 条评论

目 录CONTENT

文章目录

跨境SD-WAN高可用改造项目 实施细则 AI生成

JasonLiu
2025-12-28 / 0 评论 / 0 点赞 / 2 阅读 / 33,429 字

跨境SD-WAN高可用改造项目

分阶段详细实施方案


第一阶段:手动切换双隧道实施方案

目标周期:5个工作日
完成标准:双隧道建立、策略路由配置、手动切换验证

第1天:环境准备与LA2初始化

1.1 LA2节点环境初始化

脚本 1.1.1:LA2基础环境安装脚本

#!/bin/bash
# la2_initial_setup.sh
# 在LA2节点上执行

set -e

echo "=== LA2节点初始化开始 ==="

# 1. 更新系统
apt update && apt upgrade -y

# 2. 安装基础组件
apt install -y wireguard wireguard-tools resolvconf net-tools iptables-persistent

# 3. 生成WireGuard密钥对
mkdir -p /etc/wireguard
cd /etc/wireguard
umask 077
wg genkey | tee privatekey | wg pubkey > publickey

# 4. 配置sysctl参数
cat > /etc/sysctl.d/60-wireguard.conf << EOF
net.ipv4.ip_forward=1
net.ipv6.conf.all.forwarding=1
net.core.rmem_max=134217728
net.core.wmem_max=134217728
EOF
sysctl -p /etc/sysctl.d/60-wireguard.conf

# 5. 配置防火墙规则
iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
iptables-save > /etc/iptables/rules.v4

# 6. 安装代理软件(Xray)
mkdir -p /usr/local/xray
wget -O /usr/local/xray/xray https://github.com/XTLS/Xray-core/releases/latest/download/Xray-linux-64.zip
unzip /usr/local/xray/xray -d /usr/local/xray/
chmod +x /usr/local/xray/xray

echo "=== LA2节点初始化完成 ==="
echo "WireGuard公钥: $(cat /etc/wireguard/publickey)"

1.2 LA2 WireGuard配置

配置文件 1.2.1:LA2的WireGuard配置

# /etc/wireguard/wg0.conf
[Interface]
# LA2的内网地址
Address = 10.8.2.1/24
# 监听端口
ListenPort = 51820
# 第一步生成的私钥
PrivateKey = <LA2_Private_Key>
# 添加DNS服务器
DNS = 8.8.8.8, 1.1.1.1

# 配置持久化保持连接
PostUp = iptables -A FORWARD -i %i -j ACCEPT; iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
PostDown = iptables -D FORWARD -i %i -j ACCEPT; iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE

[Peer]
# HZ1的WireGuard公钥(将在第2天填入)
PublicKey = <HZ1_WG2_Public_Key>
# 允许访问的网段:杭州内网和美国侧网络
AllowedIPs = 10.10.0.0/16, 10.100.2.0/30
# 由于HZ1无公网IP,不需要Endpoint
PersistentKeepalive = 25

第2天:HZ1侧双隧道配置

2.1 HZ1 WireGuard双接口配置

脚本 2.1.1:HZ1 WireGuard双隧道配置脚本

#!/bin/bash
# hz1_wireguard_setup.sh
# 在HZ1节点上执行

set -e

echo "=== HZ1双隧道WireGuard配置开始 ==="

# 1. 创建第二个WireGuard接口的配置目录
mkdir -p /etc/wireguard2
cd /etc/wireguard2
umask 077

# 2. 为第二个隧道生成新的密钥对
wg genkey | tee privatekey2 | wg pubkey > publickey2
echo "WireGuard2公钥: $(cat publickey2)"

# 3. 配置第一个WireGuard接口(连接LA1)
cat > /etc/wireguard/wg0.conf << EOF
[Interface]
Address = 10.100.1.1/30
PrivateKey = <HZ1_WG1_Private_Key>
ListenPort = 51821
# MTU优化
MTU = 1380

[Peer]
# LA1的公钥
PublicKey = <LA1_Public_Key>
# 本地代理转发的Endpoint(VLESS代理本地端口)
Endpoint = 127.0.0.1:10001
AllowedIPs = 10.8.1.0/24, 10.100.1.2/32
PersistentKeepalive = 25
EOF

# 4. 配置第二个WireGuard接口(连接LA2)
cat > /etc/wireguard2/wg1.conf << EOF
[Interface]
Address = 10.100.2.1/30
PrivateKey = $(cat privatekey2)
ListenPort = 51822
MTU = 1380

[Peer]
# LA2的公钥(需要填入)
PublicKey = <LA2_Public_Key>
Endpoint = 127.0.0.1:10002
AllowedIPs = 10.8.2.0/24, 10.100.2.2/32
PersistentKeepalive = 25
EOF

# 5. 启动WireGuard服务
systemctl enable wg-quick@wg0
wg-quick up wg0

# 6. 创建第二个WireGuard的服务单元文件
cat > /etc/systemd/system/[email protected] << EOF
[Unit]
Description=WireGuard via wg-quick(8) for wg1
After=network-online.target nss-lookup.target
Wants=network-online.target nss-lookup.target
PartOf=wg-quick.target
Documentation=man:wg-quick(8)
Documentation=man:wg(8)
Documentation=https://www.wireguard.com/
Documentation=https://www.wireguard.com/quickstart/

[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/bin/wg-quick up wg1
ExecStop=/usr/bin/wg-quick down wg1
Environment=WG_ENDPOINT_RESOLUTION_RETRIES=infinity

[Install]
WantedBy=multi-user.target
EOF

# 7. 创建第二个WireGuard的配置文件链接
ln -s /etc/wireguard2/wg1.conf /etc/wireguard/wg1.conf

# 8. 启动第二个WireGuard服务
systemctl daemon-reload
systemctl enable wg-quick@wg1
systemctl start wg-quick@wg1

echo "=== HZ1双隧道WireGuard配置完成 ==="

2.2 双代理配置(VLESS+REALITY)

配置文件 2.2.1:HZ1双代理配置

// /usr/local/xray/config_tunnel1.json - LA1隧道代理
{
  "inbounds": [
    {
      "port": 10001,
      "protocol": "dokodemo-door",
      "settings": {
        "address": "10.100.1.2",
        "port": 51820,
        "network": "tcp,udp"
      },
      "tag": "tunnel1_inbound"
    }
  ],
  "outbounds": [
    {
      "protocol": "vless",
      "settings": {
        "vnext": [
          {
            "address": "LA1_PUBLIC_IP",
            "port": 443,
            "users": [
              {
                "id": "UUID_FOR_LA1",
                "encryption": "none",
                "flow": "xtls-rprx-vision"
              }
            ]
          }
        ]
      },
      "streamSettings": {
        "network": "tcp",
        "security": "reality",
        "realitySettings": {
          "dest": "cloudflare.com:443",
          "serverNames": ["cloudflare.com"],
          "privateKey": "REALITY_PRIVATE_KEY_LA1",
          "shortIds": ["abcdef1234567890"]
        }
      },
      "tag": "tunnel1_outbound"
    }
  ],
  "routing": {
    "rules": [
      {
        "type": "field",
        "inboundTag": ["tunnel1_inbound"],
        "outboundTag": "tunnel1_outbound"
      }
    ]
  }
}

// /usr/local/xray/config_tunnel2.json - LA2隧道代理
{
  "inbounds": [
    {
      "port": 10002,
      "protocol": "dokodemo-door",
      "settings": {
        "address": "10.100.2.2",
        "port": 51820,
        "network": "tcp,udp"
      },
      "tag": "tunnel2_inbound"
    }
  ],
  "outbounds": [
    {
      "protocol": "vless",
      "settings": {
        "vnext": [
          {
            "address": "LA2_PUBLIC_IP",
            "port": 443,
            "users": [
              {
                "id": "UUID_FOR_LA2",
                "encryption": "none",
                "flow": "xtls-rprx-vision"
              }
            ]
          }
        ]
      },
      "streamSettings": {
        "network": "tcp",
        "security": "reality",
        "realitySettings": {
          "dest": "www.microsoft.com:443",
          "serverNames": ["www.microsoft.com"],
          "privateKey": "REALITY_PRIVATE_KEY_LA2",
          "shortIds": ["fedcba0987654321"]
        }
      },
      "tag": "tunnel2_outbound"
    }
  ],
  "routing": {
    "rules": [
      {
        "type": "field",
        "inboundTag": ["tunnel2_inbound"],
        "outboundTag": "tunnel2_outbound"
      }
    ]
  }
}

第3天:策略路由配置

3.1 多路由表配置

脚本 3.1.1:HZ1策略路由配置脚本

#!/bin/bash
# hz1_policy_routing.sh
# 在HZ1节点上执行

set -e

echo "=== HZ1策略路由配置开始 ==="

# 1. 定义网络接口
WAN_IF="eth0"          # 公网出口接口
WG1_IF="wg0"           # LA1隧道接口
WG2_IF="wg1"           # LA2隧道接口

# 2. 添加自定义路由表
echo "200 la1_table" >> /etc/iproute2/rt_tables
echo "201 la2_table" >> /etc/iproute2/rt_tables
echo "202 default_table" >> /etc/iproute2/rt_tables

# 3. 配置各路由表的默认路由
# LA1路由表
ip route add default via 10.100.1.2 dev $WG1_IF table la1_table
# LA2路由表
ip route add default via 10.100.2.2 dev $WG2_IF table la2_table
# 默认路由表(主路由)
ip route add default via 10.100.1.2 dev $WG1_IF

# 4. 添加策略路由规则
# 规则优先级:1000以下为系统保留,我们从2000开始

# 规则1:来自特定源IP的流量走LA2
ip rule add from 10.10.100.0/24 table la2_table priority 2000

# 规则2:来自管理网络的流量走LA2(备份路径)
ip rule add from 10.10.1.0/24 table la2_table priority 2001

# 规则3:标记的流量(可通过iptables标记)
ip rule add fwmark 100 table la1_table priority 2002
ip rule add fwmark 200 table la2_table priority 2003

# 规则4:主业务流量默认走LA1
ip rule add from 10.10.0.0/16 table la1_table priority 2004

# 5. 配置连接跟踪,确保回包走正确路径
iptables -t mangle -A PREROUTING -m conntrack --ctstate ESTABLISHED,RELATED -j CONNMARK --restore-mark
iptables -t mangle -A OUTPUT -m conntrack --ctstate ESTABLISHED,RELATED -j CONNMARK --restore-mark

# 6. 创建路由策略服务
cat > /etc/systemd/system/policy-routing.service << EOF
[Unit]
Description=Policy Routing Configuration
After=network-online.target [email protected] [email protected]
Wants=network-online.target

[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/local/bin/apply-policy-routing.sh
ExecStop=/usr/local/bin/cleanup-policy-routing.sh

[Install]
WantedBy=multi-user.target
EOF

# 7. 创建应用和清理脚本
cat > /usr/local/bin/apply-policy-routing.sh << 'EOF'
#!/bin/bash
# 应用策略路由配置

# 刷新路由缓存
ip route flush cache

# 确保路由表存在
grep -q "200 la1_table" /etc/iproute2/rt_tables || echo "200 la1_table" >> /etc/iproute2/rt_tables
grep -q "201 la2_table" /etc/iproute2/rt_tables || echo "201 la2_table" >> /etc/iproute2/rt_tables

# 重新应用路由规则(防止重启后丢失)
ip route add default via 10.100.1.2 dev wg0 table la1_table 2>/dev/null || true
ip route add default via 10.100.2.2 dev wg1 table la2_table 2>/dev/null || true
EOF

cat > /usr/local/bin/cleanup-policy-routing.sh << 'EOF'
#!/bin/bash
# 清理策略路由配置

# 删除自定义路由规则
for i in $(ip rule show | grep -E "2000:|2001:|2002:|2003:|2004:" | awk -F: '{print $1}'); do
    ip rule del pref $i 2>/dev/null || true
done
EOF

chmod +x /usr/local/bin/apply-policy-routing.sh /usr/local/bin/cleanup-policy-routing.sh

# 8. 启用并启动服务
systemctl daemon-reload
systemctl enable policy-routing
systemctl start policy-routing

echo "=== HZ1策略路由配置完成 ==="

3.2 流量标记与分流

脚本 3.2.1:基于应用的流量标记

#!/bin/bash
# hz1_traffic_marking.sh
# 根据应用类型标记流量

# 1. 定义应用端口列表
# 高优先级应用(走优质链路LA1)
HIGH_PRIO_PORTS="22,53,80,443,3389,5900"

# 低优先级/备份应用(可走LA2)
LOW_PRIO_PORTS="21,25,110,143,993,995"

# 2. 清除旧规则
iptables -t mangle -F
iptables -t mangle -X

# 3. 标记高优先级流量(标记为100,走LA1)
for port in $(echo $HIGH_PRIO_PORTS | tr ',' ' '); do
    iptables -t mangle -A OUTPUT -p tcp --dport $port -j MARK --set-mark 100
    iptables -t mangle -A OUTPUT -p udp --dport $port -j MARK --set-mark 100
    iptables -t mangle -A PREROUTING -p tcp --dport $port -j MARK --set-mark 100
    iptables -t mangle -A PREROUTING -p udp --dport $port -j MARK --set-mark 100
done

# 4. 标记低优先级流量(标记为200,走LA2)
for port in $(echo $LOW_PRIO_PORTS | tr ',' ' '); do
    iptables -t mangle -A OUTPUT -p tcp --dport $port -j MARK --set-mark 200
    iptables -t mangle -A OUTPUT -p udp --dport $port -j MARK --set-mark 200
    iptables -t mangle -A PREROUTING -p tcp --dport $port -j MARK --set-mark 200
    iptables -t mangle -A PREROUTING -p udp --dport $port -j MARK --set-mark 200
done

# 5. 保存iptables规则
iptables-save > /etc/iptables/rules.v4

第4天:手动切换脚本与监控

4.1 手动切换脚本

脚本 4.1.1:隧道手动切换脚本

#!/usr/bin/env python3
# /usr/local/bin/tunnel_switch.py
import subprocess
import sys
import time
import json
from datetime import datetime

class TunnelManager:
    def __init__(self):
        self.current_tunnel = self.detect_current_tunnel()
        self.log_file = "/var/log/tunnel_switch.log"
      
    def log(self, message):
        """记录日志"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_message = f"[{timestamp}] {message}"
        print(log_message)
        with open(self.log_file, "a") as f:
            f.write(log_message + "\n")
  
    def detect_current_tunnel(self):
        """检测当前使用的隧道"""
        try:
            result = subprocess.run(
                "ip route show default | awk '/default/ {print $5}'",
                shell=True,
                capture_output=True,
                text=True
            )
            iface = result.stdout.strip()
            if iface == "wg0":
                return "la1"
            elif iface == "wg1":
                return "la2"
            else:
                return "unknown"
        except:
            return "unknown"
  
    def check_tunnel_health(self, tunnel):
        """检查隧道健康状况"""
        if tunnel == "la1":
            target_ip = "10.100.1.2"
            iface = "wg0"
        else:
            target_ip = "10.100.2.2"
            iface = "wg1"
      
        # 检查接口状态
        try:
            subprocess.run(f"ip link show {iface} | grep -q 'state UP'", 
                         shell=True, check=True)
        except:
            self.log(f"隧道 {tunnel} 接口 {iface} 未启动")
            return False
      
        # 检查连通性
        try:
            result = subprocess.run(
                f"ping -c 3 -W 1 -I {iface} {target_ip}",
                shell=True,
                capture_output=True,
                text=True
            )
            if result.returncode == 0:
                # 解析ping结果
                lines = result.stdout.split('\n')
                for line in lines:
                    if "packet loss" in line:
                        loss = float(line.split('%')[0].split()[-1])
                        if loss < 20:  # 丢包率低于20%认为健康
                            return True
            return False
        except:
            return False
  
    def switch_to_tunnel(self, target_tunnel):
        """切换到指定隧道"""
        current = self.detect_current_tunnel()
        if current == target_tunnel:
            self.log(f"已经是 {target_tunnel} 隧道,无需切换")
            return True
      
        self.log(f"开始切换: {current} -> {target_tunnel}")
      
        # 检查目标隧道健康状况
        if not self.check_tunnel_health(target_tunnel):
            self.log(f"目标隧道 {target_tunnel} 不健康,切换中止")
            return False
      
        # 执行切换
        try:
            if target_tunnel == "la1":
                # 切换到LA1
                subprocess.run("ip route replace default via 10.100.1.2 dev wg0", 
                             shell=True, check=True)
                # 更新策略路由优先级
                subprocess.run("ip rule del pref 2004 2>/dev/null || true", shell=True)
                subprocess.run("ip rule add from 10.10.0.0/16 table la1_table priority 2004", 
                             shell=True, check=True)
            else:
                # 切换到LA2
                subprocess.run("ip route replace default via 10.100.2.2 dev wg1", 
                             shell=True, check=True)
                # 更新策略路由优先级
                subprocess.run("ip rule del pref 2004 2>/dev/null || true", shell=True)
                subprocess.run("ip rule add from 10.10.0.0/16 table la2_table priority 2004", 
                             shell=True, check=True)
          
            # 刷新路由缓存
            subprocess.run("ip route flush cache", shell=True)
          
            # 验证切换
            time.sleep(2)
            new_tunnel = self.detect_current_tunnel()
            if new_tunnel == target_tunnel:
                self.log(f"切换成功: {current} -> {target_tunnel}")
              
                # 发送通知(可选)
                self.send_notification(f"隧道切换: {current} -> {target_tunnel}")
                return True
            else:
                self.log(f"切换验证失败,当前隧道: {new_tunnel}")
                return False
              
        except subprocess.CalledProcessError as e:
            self.log(f"切换失败: {str(e)}")
            return False
  
    def send_notification(self, message):
        """发送通知(可扩展为邮件、钉钉、Slack等)"""
        # 这里可以实现通知逻辑
        pass
  
    def show_status(self):
        """显示当前状态"""
        status = {
            "current_tunnel": self.detect_current_tunnel(),
            "la1_health": self.check_tunnel_health("la1"),
            "la2_health": self.check_tunnel_health("la2"),
            "timestamp": datetime.now().isoformat()
        }
        print(json.dumps(status, indent=2))
        return status

def main():
    if len(sys.argv) < 2:
        print("使用方法:")
        print("  tunnel_switch.py status          # 显示状态")
        print("  tunnel_switch.py switch la1      # 切换到LA1")
        print("  tunnel_switch.py switch la2      # 切换到LA2")
        print("  tunnel_switch.py auto            # 自动选择最佳隧道")
        sys.exit(1)
  
    manager = TunnelManager()
  
    command = sys.argv[1]
  
    if command == "status":
        manager.show_status()
  
    elif command == "switch" and len(sys.argv) == 3:
        target = sys.argv[2]
        if target in ["la1", "la2"]:
            success = manager.switch_to_tunnel(target)
            sys.exit(0 if success else 1)
        else:
            print("无效的隧道名称,使用 la1 或 la2")
            sys.exit(1)
  
    elif command == "auto":
        # 自动选择最佳隧道
        la1_health = manager.check_tunnel_health("la1")
        la2_health = manager.check_tunnel_health("la2")
      
        if la1_health and not la2_health:
            manager.switch_to_tunnel("la1")
        elif la2_health and not la1_health:
            manager.switch_to_tunnel("la2")
        elif la1_health and la2_health:
            # 两个都健康,优先LA1
            manager.switch_to_tunnel("la1")
        else:
            print("所有隧道都不健康")
            sys.exit(1)
  
    else:
        print("无效命令")
        sys.exit(1)

if __name__ == "__main__":
    main()

脚本 4.1.2:切换脚本的systemd服务

# /etc/systemd/system/tunnel-manager.service
[Unit]
Description=Tunnel Manager Service
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
ExecStart=/usr/local/bin/tunnel_manager_daemon.py
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target
#!/usr/bin/env python3
# /usr/local/bin/tunnel_manager_daemon.py
import time
from tunnel_switch import TunnelManager

def daemon_main():
    manager = TunnelManager()
    check_interval = 60  # 每60秒检查一次
  
    while True:
        try:
            # 检查当前隧道健康状态
            current = manager.detect_current_tunnel()
            current_health = manager.check_tunnel_health(current)
          
            if not current_health:
                manager.log(f"当前隧道 {current} 不健康,尝试切换到备用隧道")
              
                # 尝试切换到另一个隧道
                if current == "la1":
                    manager.switch_to_tunnel("la2")
                else:
                    manager.switch_to_tunnel("la1")
          
            time.sleep(check_interval)
          
        except Exception as e:
            manager.log(f"守护进程异常: {str(e)}")
            time.sleep(10)

if __name__ == "__main__":
    daemon_main()

4.2 基础监控配置

脚本 4.2.1:Prometheus Node Exporter配置

#!/bin/bash
# setup_monitoring.sh
# 在HZ1和LA1、LA2上分别执行

# 1. 安装Node Exporter
wget https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node_exporter-1.6.1.linux-amd64.tar.gz
tar xvf node_exporter-*.tar.gz
mv node_exporter-*/node_exporter /usr/local/bin/

# 2. 创建systemd服务
cat > /etc/systemd/system/node_exporter.service << EOF
[Unit]
Description=Node Exporter
After=network.target

[Service]
Type=simple
User=nobody
ExecStart=/usr/local/bin/node_exporter \
  --collector.systemd \
  --collector.tcpstat \
  --collector.netstat \
  --collector.netdev \
  --collector.conntrack \
  --web.listen-address=:9100

[Install]
WantedBy=multi-user.target
EOF

# 3. 启动服务
systemctl daemon-reload
systemctl enable node_exporter
systemctl start node_exporter

# 4. 安装自定义指标收集脚本
cat > /usr/local/bin/custom_metrics.sh << 'EOF'
#!/bin/bash
# 收集自定义指标

# WireGuard隧道状态
wg_la1_status=$(wg show wg0 2>/dev/null | grep -c "interface: wg0" || echo "0")
wg_la2_status=$(wg show wg1 2>/dev/null | grep -c "interface: wg1" || echo "0")

# 隧道延迟
la1_latency=$(ping -c 1 -W 1 10.100.1.2 2>/dev/null | grep "time=" | awk -F'time=' '{print $2}' | awk '{print $1}' || echo "0")
la2_latency=$(ping -c 1 -W 1 10.100.2.2 2>/dev/null | grep "time=" | awk -F'time=' '{print $2}' | awk '{print $1}' || echo "0")

# 输出Prometheus格式指标
cat << METRICS
# HELP wg_tunnel_status WireGuard隧道状态 (1=up, 0=down)
# TYPE wg_tunnel_status gauge
wg_tunnel_status{tunnel="la1"} ${wg_la1_status}
wg_tunnel_status{tunnel="la2"} ${wg_la2_status}

# HELP tunnel_latency_ms 隧道延迟(毫秒)
# TYPE tunnel_latency_ms gauge
tunnel_latency_ms{tunnel="la1"} ${la1_latency}
tunnel_latency_ms{tunnel="la2"} ${la2_latency}

# HELP tunnel_last_check 最后检查时间戳
# TYPE tunnel_last_check gauge
tunnel_last_check $(date +%s)
METRICS
EOF

chmod +x /usr/local/bin/custom_metrics.sh

# 5. 配置Prometheus textfile收集器
cat > /etc/systemd/system/custom-metrics.service << EOF
[Unit]
Description=Custom Metrics Collector
After=network-online.target

[Service]
Type=oneshot
ExecStart=/usr/local/bin/custom_metrics.sh > /var/lib/node_exporter/custom_metrics.prom
EOF

cat > /etc/systemd/system/custom-metrics.timer << EOF
[Unit]
Description=Run custom metrics every 10 seconds

[Timer]
OnBootSec=1min
OnUnitActiveSec=10s

[Install]
WantedBy=timers.target
EOF

systemctl daemon-reload
systemctl enable custom-metrics.timer
systemctl start custom-metrics.timer

第5天:测试与验证

5.1 测试脚本

脚本 5.1.1:双隧道功能测试脚本

#!/usr/bin/env python3
# test_tunnels.py
import subprocess
import time
import json
import sys

class TunnelTester:
    def __init__(self):
        self.test_cases = []
      
    def run_test(self, name, command, expected_returncode=0):
        """运行单个测试"""
        print(f"\n{'='*50}")
        print(f"测试: {name}")
        print(f"命令: {command}")
      
        start_time = time.time()
        try:
            result = subprocess.run(
                command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=30
            )
            elapsed = time.time() - start_time
          
            success = (result.returncode == expected_returncode)
            status = "✅ 通过" if success else "❌ 失败"
          
            print(f"结果: {status} ({elapsed:.2f}秒)")
            print(f"返回值: {result.returncode}")
          
            if result.stdout:
                print(f"输出: {result.stdout[:500]}...")
            if result.stderr and not success:
                print(f"错误: {result.stderr[:500]}...")
          
            return {
                "name": name,
                "success": success,
                "elapsed": elapsed,
                "returncode": result.returncode
            }
          
        except subprocess.TimeoutExpired:
            elapsed = time.time() - start_time
            print(f"结果: ⏱️ 超时 ({elapsed:.2f}秒)")
            return {
                "name": name,
                "success": False,
                "elapsed": elapsed,
                "returncode": -1
            }
        except Exception as e:
            print(f"结果: ❌ 异常: {str(e)}")
            return {
                "name": name,
                "success": False,
                "elapsed": 0,
                "returncode": -1
            }
  
    def run_all_tests(self):
        """运行所有测试"""
        print("开始跨境双隧道测试套件")
        print("="*50)
      
        tests = [
            # 1. 基础连接测试
            ("检查WireGuard接口", "ip link show wg0 && ip link show wg1"),
            ("检查WireGuard状态", "wg show"),
            ("检查路由表", "ip route show table la1_table && ip route show table la2_table"),
          
            # 2. 隧道连通性测试
            ("测试LA1隧道连通性", "ping -c 3 -W 1 -I wg0 10.100.1.2"),
            ("测试LA2隧道连通性", "ping -c 3 -W 1 -I wg1 10.100.2.2"),
          
            # 3. 代理层测试
            ("测试代理服务状态", "systemctl status xray --no-pager"),
            ("测试代理端口监听", "netstat -tlnp | grep '1000[12]'"),
          
            # 4. 策略路由测试
            ("测试策略路由规则", "ip rule show"),
            ("测试默认路由", "ip route show default"),
          
            # 5. 跨境访问测试
            ("测试通过LA1访问美国服务", "curl --interface wg0 -s --connect-timeout 5 http://ifconfig.me"),
            ("测试通过LA2访问美国服务", "curl --interface wg1 -s --connect-timeout 5 http://ifconfig.me"),
          
            # 6. 切换功能测试
            ("测试切换到LA2", "/usr/local/bin/tunnel_switch.py switch la2"),
            ("验证LA2为默认路由", "ip route show default | grep wg1"),
            ("测试切换后访问", "curl --interface wg1 -s --connect-timeout 5 http://ifconfig.me"),
            ("切换回LA1", "/usr/local/bin/tunnel_switch.py switch la1"),
          
            # 7. 监控测试
            ("测试监控指标", "/usr/local/bin/custom_metrics.sh | head -5"),
            ("测试Node Exporter", "curl -s http://localhost:9100/metrics | grep -i 'node_' | head -3"),
        ]
      
        results = []
        for test_name, test_command in tests:
            result = self.run_test(test_name, test_command)
            results.append(result)
            time.sleep(1)  # 测试间隔
      
        # 生成测试报告
        self.generate_report(results)
      
        # 返回总体结果
        total = len(results)
        passed = sum(1 for r in results if r["success"])
        failed = total - passed
      
        print(f"\n{'='*50}")
        print(f"测试完成: {passed}/{total} 通过 ({passed/total*100:.1f}%)")
      
        if failed > 0:
            print("\n失败的测试:")
            for r in results:
                if not r["success"]:
                    print(f"  - {r['name']}")
      
        return passed == total
  
    def generate_report(self, results):
        """生成测试报告"""
        report = {
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "hostname": subprocess.getoutput("hostname"),
            "results": results,
            "summary": {
                "total": len(results),
                "passed": sum(1 for r in results if r["success"]),
                "failed": sum(1 for r in results if not r["success"])
            }
        }
      
        # 保存报告到文件
        report_file = f"/tmp/tunnel_test_report_{time.strftime('%Y%m%d_%H%M%S')}.json"
        with open(report_file, "w") as f:
            json.dump(report, f, indent=2)
      
        print(f"\n测试报告已保存: {report_file}")
      
        # 同时输出简单文本报告
        text_report = f"/tmp/tunnel_test_report_{time.strftime('%Y%m%d_%H%M%S')}.txt"
        with open(text_report, "w") as f:
            f.write("跨境双隧道测试报告\n")
            f.write("="*50 + "\n")
            f.write(f"时间: {report['timestamp']}\n")
            f.write(f"主机: {report['hostname']}\n")
            f.write(f"总计: {report['summary']['total']} 项测试\n")
            f.write(f"通过: {report['summary']['passed']}\n")
            f.write(f"失败: {report['summary']['failed']}\n\n")
          
            for r in results:
                status = "✓" if r["success"] else "✗"
                f.write(f"{status} {r['name']} ({r['elapsed']:.2f}s)\n")
      
        print(f"文本报告: {text_report}")

def main():
    tester = TunnelTester()
    success = tester.run_all_tests()
    sys.exit(0 if success else 1)

if __name__ == "__main__":
    main()

5.2 回滚脚本

脚本 5.2.1:一键回滚到单隧道模式

#!/bin/bash
# rollback_to_single_tunnel.sh
# 紧急情况下回滚到只有LA1的单隧道模式

set -e

echo "=== 开始回滚到单隧道模式 ==="

# 1. 停止并禁用LA2相关服务
systemctl stop wg-quick@wg1 2>/dev/null || true
systemctl disable wg-quick@wg1 2>/dev/null || true
systemctl stop tunnel-manager 2>/dev/null || true
systemctl disable tunnel-manager 2>/dev/null || true

# 2. 停止LA2的代理服务
systemctl stop xray-tunnel2 2>/dev/null || true
systemctl disable xray-tunnel2 2>/dev/null || true

# 3. 清理路由表和规则
# 删除自定义路由表
sed -i '/la2_table/d' /etc/iproute2/rt_tables
sed -i '/default_table/d' /etc/iproute2/rt_tables

# 删除LA2相关路由规则
for pref in $(ip rule show | grep -E "200[0-9]:.*la2" | awk -F: '{print $1}'); do
    ip rule del pref $pref 2>/dev/null || true
done

# 删除LA2接口的路由
ip route del 10.8.2.0/24 dev wg1 2>/dev/null || true
ip route del 10.100.2.0/30 dev wg1 2>/dev/null || true

# 4. 确保默认路由指向LA1
ip route replace default via 10.100.1.2 dev wg0

# 5. 清理防火墙标记规则
iptables -t mangle -F
iptables-save > /etc/iptables/rules.v4

# 6. 刷新路由缓存
ip route flush cache

# 7. 验证回滚结果
echo -e "\n=== 回滚验证 ==="
echo "当前默认路由:"
ip route show default

echo -e "\nWireGuard接口状态:"
wg show

echo -e "\n接口状态:"
ip link show wg0
ip link show wg1 2>/dev/null || echo "wg1接口已不存在"

# 8. 测试连通性
echo -e "\n测试LA1隧道连通性:"
ping -c 2 -W 1 -I wg0 10.100.1.2

echo -e "\n测试跨境访问:"
curl --interface wg0 -s --connect-timeout 3 http://ifconfig.me/ip

echo -e "\n=== 回滚完成 ==="
echo "系统已回滚到单隧道(LA1)模式"
echo "如果需要重新启用双隧道,请运行: systemctl enable wg-quick@wg1 && systemctl start wg-quick@wg1"

第二阶段:智能路由与自动切换实施方案

目标周期:15个工作日
完成标准:FRR部署、健康检查系统、自动切换验证

第6-7天:FRR部署与OSPF配置

6.1 FRR安装与配置

脚本 6.1.1:FRR安装配置脚本

#!/bin/bash
# frr_installation.sh
# 在HZ1节点上执行

set -e

echo "=== FRR安装配置开始 ==="

# 1. 安装FRR
apt update
apt install -y frr frr-pythontools

# 2. 配置FRR守护进程
sed -i 's/^bgpd=no/bgpd=yes/' /etc/frr/daemons
sed -i 's/^ospfd=no/ospfd=yes/' /etc/frr/daemons
sed -i 's/^zebra=no/zebra=yes/' /etc/frr/daemons
sed -i 's/^staticd=no/staticd=yes/' /etc/frr/daemons

# 3. 配置zebra(基础路由守护进程)
cat > /etc/frr/zebra.conf << 'EOF'
! Zebra configuration
hostname HZ1-router
password frr
enable password frr

! 日志配置
log file /var/log/frr/zebra.log
log syslog informational
EOF

# 4. 配置OSPF
cat > /etc/frr/ospfd.conf << 'EOF'
! OSPF configuration
hostname HZ1-ospfd
password frr
enable password frr

! 日志配置
log file /var/log/frr/ospfd.log
log syslog informational

! OSPF进程配置
router ospf
  ! 路由器ID
  ospf router-id 10.10.1.1
  ! 重分发直连和静态路由
  redistribute connected
  redistribute static
  ! 网络宣告
  network 10.10.0.0/16 area 0
  network 10.100.1.0/30 area 0
  network 10.100.2.0/30 area 0
  ! 被动接口配置(除了隧道接口)
  passive-interface default
  no passive-interface wg0
  no passive-interface wg1
  ! OSPF调优参数
  auto-cost reference-bandwidth 1000
  timers throttle spf 10 100 5000
  timers throttle lsa all 10 100 5000
EOF

# 5. 配置静态路由重分发
cat > /etc/frr/staticd.conf << 'EOF'
! Static routing configuration
hostname HZ1-staticd
password frr
enable password frr

! 日志配置
log file /var/log/frr/staticd.log
log syslog informational
EOF

# 6. 配置VTY访问控制
cat > /etc/frr/vtysh.conf << 'EOF'
! VTY configuration
hostname HZ1
username frr nopassword
!
service integrated-vtysh-config
!
line vty
  exec-timeout 30 0
!
end
EOF

# 7. 设置文件权限
chown -R frr:frr /etc/frr
chmod 640 /etc/frr/*.conf

# 8. 启动FRR服务
systemctl enable frr
systemctl restart frr

# 9. 验证FRR状态
echo -e "\n=== 验证FRR状态 ==="
systemctl status frr --no-pager

echo -e "\n=== 进入FRR控制台验证 ==="
echo "使用以下命令进入FRR控制台:"
echo "  vtysh"
echo ""
echo "在vtysh中可用命令:"
echo "  show running-config"
echo "  show ip ospf interface"
echo "  show ip ospf neighbor"
echo "  show ip route ospf"

# 10. 创建FRR操作辅助脚本
cat > /usr/local/bin/frr-status.sh << 'EOF'
#!/bin/bash
# FRR状态检查脚本

echo "=== FRR服务状态 ==="
systemctl status frr --no-pager | grep -A 3 "Active:"

echo -e "\n=== OSPF接口状态 ==="
vtysh -c "show ip ospf interface brief"

echo -e "\n=== OSPF邻居 ==="
vtysh -c "show ip ospf neighbor"

echo -e "\n=== OSPF路由表 ==="
vtysh -c "show ip route ospf" | head -20

echo -e "\n=== 全部路由表 ==="
vtysh -c "show ip route" | grep -E "(O|C|S)" | head -30
EOF

chmod +x /usr/local/bin/frr-status.sh

echo "=== FRR安装配置完成 ==="

6.2 OSPF接口调优

脚本 6.2.1:OSPF接口优化配置

#!/bin/bash
# ospf_interface_tuning.sh
# 在HZ1上执行

# 进入vtysh配置模式
vtysh << 'EOF'
configure terminal

! 配置wg0接口(连接LA1)
interface wg0
  ! 初始cost设置为10(高质量链路)
  ip ospf cost 10
  ! OSPF网络类型设置为点对点(适合隧道)
  ip ospf network point-to-point
  ! OSPF定时器调优
  ip ospf dead-interval minimal hello-multiplier 4
  ! 启用MTU忽略(防止MTU不匹配问题)
  ip ospf mtu-ignore
  ! 描述
  description "Tunnel to LA1 (Primary)"
  exit

! 配置wg1接口(连接LA2)
interface wg1
  ! 初始cost设置为50(备用链路)
  ip ospf cost 50
  ! OSPF网络类型设置为点对点
  ip ospf network point-to-point
  ! OSPF定时器调优
  ip ospf dead-interval minimal hello-multiplier 4
  ! 启用MTU忽略
  ip ospf mtu-ignore
  ! 描述
  description "Tunnel to LA2 (Backup)"
  exit

! 配置杭州内网接口
interface eth1
  ! 如果是广播网络,使用默认设置
  ip ospf cost 1
  ! 描述
  description "Hangzhou Internal Network"
  exit

! 保存配置
write memory
EOF

echo "OSPF接口调优完成"

第8-10天:健康检查系统开发

8.1 健康检查核心模块

脚本 8.1.1:综合健康检查系统

#!/usr/bin/env python3
# /usr/local/bin/health_checker.py
import subprocess
import time
import json
import statistics
import threading
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/health_checker.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

@dataclass
class TunnelConfig:
    """隧道配置"""
    name: str
    interface: str
    gateway_ip: str
    test_targets: List[str]  # 测试目标列表
    initial_cost: int = 100
    min_cost: int = 10
    max_cost: int = 65535

@dataclass
class HealthCheckResult:
    """健康检查结果"""
    timestamp: datetime
    tunnel_name: str
    latency_ms: float
    jitter_ms: float
    packet_loss_percent: float
    tcp_connect_time_ms: float
    http_response_time_ms: float
    overall_score: float
    recommended_cost: int

class TunnelHealthChecker:
    def __init__(self):
        self.tunnels = {
            "la1": TunnelConfig(
                name="la1",
                interface="wg0",
                gateway_ip="10.100.1.2",
                test_targets=["8.8.8.8", "1.1.1.1", "10.100.1.2"],
                initial_cost=10
            ),
            "la2": TunnelConfig(
                name="la2",
                interface="wg1",
                gateway_ip="10.100.2.2",
                test_targets=["8.8.4.4", "9.9.9.9", "10.100.2.2"],
                initial_cost=50
            )
        }
      
        # 健康检查历史记录
        self.history: Dict[str, List[HealthCheckResult]] = {
            "la1": [],
            "la2": []
        }
      
        # 当前cost值
        self.current_costs = {
            "la1": 10,
            "la2": 50
        }
      
        # 锁,用于线程安全
        self.lock = threading.Lock()
      
    def ping_test(self, target_ip: str, interface: str, count: int = 10) -> Dict:
        """执行ping测试"""
        try:
            cmd = f"ping -c {count} -i 0.2 -W 1 -I {interface} {target_ip}"
            result = subprocess.run(
                cmd,
                shell=True,
                capture_output=True,
                text=True,
                timeout=count * 2
            )
          
            if result.returncode == 0:
                # 解析输出
                lines = result.stdout.split('\n')
                stats_line = None
                rtt_line = None
              
                for line in lines:
                    if "packet loss" in line:
                        stats_line = line
                    if "rtt min/avg/max/mdev" in line:
                        rtt_line = line
              
                if stats_line and rtt_line:
                    # 提取丢包率
                    loss_str = stats_line.split('%')[0].split()[-1]
                    packet_loss = float(loss_str)
                  
                    # 提取RTT统计
                    rtt_parts = rtt_line.split('=')[1].strip().split('/')
                    rtt_min = float(rtt_parts[0])
                    rtt_avg = float(rtt_parts[1])
                    rtt_max = float(rtt_parts[2])
                    rtt_mdev = float(rtt_parts[3])  # 抖动
                  
                    return {
                        "success": True,
                        "packet_loss": packet_loss,
                        "latency_min": rtt_min,
                        "latency_avg": rtt_avg,
                        "latency_max": rtt_max,
                        "jitter": rtt_mdev,
                        "packets_sent": count,
                        "packets_received": count * (1 - packet_loss / 100)
                    }
          
            return {"success": False, "error": "解析失败"}
          
        except subprocess.TimeoutExpired:
            return {"success": False, "error": "超时"}
        except Exception as e:
            return {"success": False, "error": str(e)}
  
    def tcp_connect_test(self, target_ip: str, port: int, interface: str, timeout: int = 3) -> float:
        """TCP连接测试"""
        try:
            # 使用timeout命令和nc进行TCP连接测试
            start_time = time.time()
            cmd = f"timeout {timeout} nc -z -v -w {timeout} {target_ip} {port} 2>&1"
            result = subprocess.run(
                cmd,
                shell=True,
                capture_output=True,
                text=True
            )
            elapsed = (time.time() - start_time) * 1000  # 转换为毫秒
          
            if result.returncode == 0:
                return elapsed
            else:
                return timeout * 1000  # 返回超时值
        except:
            return timeout * 1000
  
    def http_test(self, url: str, interface: str, timeout: int = 5) -> float:
        """HTTP响应测试"""
        try:
            start_time = time.time()
            cmd = f"curl --interface {interface} -s -o /dev/null -w '%{{http_code}} %{{time_total}}' --max-time {timeout} {url}"
            result = subprocess.run(
                cmd,
                shell=True,
                capture_output=True,
                text=True
            )
            elapsed = (time.time() - start_time) * 1000
          
            if result.returncode == 0:
                parts = result.stdout.strip().split()
                if len(parts) == 2 and parts[0] == "200":
                    return float(parts[1]) * 1000  # 转换为毫秒
            return timeout * 1000
        except:
            return timeout * 1000
  
    def calculate_score(self, test_results: Dict) -> float:
        """计算综合评分(0-100)"""
        weights = {
            "latency": 0.3,      # 延迟权重
            "jitter": 0.2,       # 抖动权重
            "packet_loss": 0.3,  # 丢包权重
            "tcp_connect": 0.1,  # TCP连接权重
            "http_response": 0.1  # HTTP响应权重
        }
      
        # 归一化各项指标(0-1,1为最佳)
        def normalize_latency(latency_ms):
            # 延迟低于50ms得1分,超过300ms得0分
            return max(0, min(1, 1 - (latency_ms - 50) / 250))
      
        def normalize_jitter(jitter_ms):
            # 抖动低于10ms得1分,超过100ms得0分
            return max(0, min(1, 1 - jitter_ms / 100))
      
        def normalize_packet_loss(loss_percent):
            # 丢包0%得1分,超过20%得0分
            return max(0, min(1, 1 - loss_percent / 20))
      
        def normalize_tcp_connect(connect_time_ms):
            # 连接时间低于100ms得1分,超过1000ms得0分
            return max(0, min(1, 1 - (connect_time_ms - 100) / 900))
      
        def normalize_http_response(response_time_ms):
            # 响应时间低于200ms得1分,超过2000ms得0分
            return max(0, min(1, 1 - (response_time_ms - 200) / 1800))
      
        # 计算各项得分
        latency_score = normalize_latency(test_results.get("latency_avg", 300))
        jitter_score = normalize_jitter(test_results.get("jitter", 100))
        packet_loss_score = normalize_packet_loss(test_results.get("packet_loss", 100))
        tcp_score = normalize_tcp_connect(test_results.get("tcp_connect_time", 1000))
        http_score = normalize_http_response(test_results.get("http_response_time", 2000))
      
        # 加权平均
        total_score = (
            latency_score * weights["latency"] +
            jitter_score * weights["jitter"] +
            packet_loss_score * weights["packet_loss"] +
            tcp_score * weights["tcp_connect"] +
            http_score * weights["http_response"]
        ) * 100  # 转换为0-100分
      
        return round(total_score, 2)
  
    def score_to_cost(self, score: float) -> int:
        """将评分转换为OSPF cost值"""
        if score >= 90:
            return 10      # 优秀
        elif score >= 80:
            return 20      # 良好
        elif score >= 70:
            return 50      # 一般
        elif score >= 60:
            return 100     # 较差
        elif score >= 50:
            return 200     # 差
        elif score >= 30:
            return 500     # 很差
        else:
            return 65535   # 不可用
  
    def check_tunnel_health(self, tunnel_config: TunnelConfig) -> HealthCheckResult:
        """检查单个隧道的健康状况"""
        logger.info(f"开始检查隧道: {tunnel_config.name}")
      
        test_results = {
            "latency_avg": 300,  # 默认值
            "jitter": 100,
            "packet_loss": 100,
            "tcp_connect_time": 3000,
            "http_response_time": 5000
        }
      
        # 1. 执行ping测试(对网关和外部目标)
        ping_results = []
        for target in tunnel_config.test_targets[:2]:  # 只测试前两个目标
            result = self.ping_test(target, tunnel_config.interface, count=5)
            if result["success"]:
                ping_results.append(result)
                time.sleep(0.5)  # 测试间隔
      
        if ping_results:
            # 计算平均延迟和抖动
            avg_latency = statistics.mean([r["latency_avg"] for r in ping_results])
            avg_jitter = statistics.mean([r["jitter"] for r in ping_results])
            avg_loss = statistics.mean([r["packet_loss"] for r in ping_results])
          
            test_results.update({
                "latency_avg": avg_latency,
                "jitter": avg_jitter,
                "packet_loss": avg_loss
            })
      
        # 2. TCP连接测试
        tcp_time = self.tcp_connect_test("8.8.8.8", 53, tunnel_config.interface)
        test_results["tcp_connect_time"] = tcp_time
      
        # 3. HTTP测试
        http_time = self.http_test("http://ifconfig.me", tunnel_config.interface)
        test_results["http_response_time"] = http_time
      
        # 4. 计算综合评分
        overall_score = self.calculate_score(test_results)
        recommended_cost = self.score_to_cost(overall_score)
      
        # 5. 创建结果对象
        result = HealthCheckResult(
            timestamp=datetime.now(),
            tunnel_name=tunnel_config.name,
            latency_ms=test_results["latency_avg"],
            jitter_ms=test_results["jitter"],
            packet_loss_percent=test_results["packet_loss"],
            tcp_connect_time_ms=test_results["tcp_connect_time"],
            http_response_time_ms=test_results["http_response_time"],
            overall_score=overall_score,
            recommended_cost=recommended_cost
        )
      
        logger.info(f"隧道 {tunnel_config.name} 检查完成: "
                   f"评分={overall_score}, 推荐cost={recommended_cost}")
      
        return result
  
    def update_ospf_cost(self, tunnel_name: str, new_cost: int) -> bool:
        """更新OSPF接口的cost值"""
        if tunnel_name not in self.tunnels:
            logger.error(f"未知隧道: {tunnel_name}")
            return False
      
        # 获取当前cost
        current_cost = self.current_costs.get(tunnel_name)
      
        # 如果cost没有变化,不需要更新
        if current_cost == new_cost:
            logger.debug(f"隧道 {tunnel_name} cost未变化: {new_cost}")
            return True
      
        # 防止频繁切换:只有当变化超过阈值时才更新
        cost_change_threshold = 50
        if current_cost and abs(current_cost - new_cost) < cost_change_threshold:
            logger.debug(f"隧道 {tunnel_name} cost变化过小: {current_cost} -> {new_cost}")
            return False
      
        # 防止cost在边界值附近震荡
        if new_cost == 65535 and current_cost and current_cost > 500:
            logger.debug(f"隧道 {tunnel_name} 已在故障状态,不更新cost")
            return False
      
        try:
            interface = self.tunnels[tunnel_name].interface
            cmd = f"vtysh -c 'configure terminal' -c 'interface {interface}' -c 'ip ospf cost {new_cost}' -c 'end'"
          
            logger.info(f"更新隧道 {tunnel_name} cost: {current_cost} -> {new_cost}")
            subprocess.run(cmd, shell=True, check=True)
          
            # 更新内存中的cost值
            self.current_costs[tunnel_name] = new_cost
          
            # 记录变更
            change_log = {
                "timestamp": datetime.now().isoformat(),
                "tunnel": tunnel_name,
                "old_cost": current_cost,
                "new_cost": new_cost,
                "interface": interface
            }
          
            with open("/var/log/ospf_cost_changes.log", "a") as f:
                f.write(json.dumps(change_log) + "\n")
          
            return True
          
        except subprocess.CalledProcessError as e:
            logger.error(f"更新OSPF cost失败: {str(e)}")
            return False
        except Exception as e:
            logger.error(f"更新OSPF cost异常: {str(e)}")
            return False
  
    def run_single_check(self):
        """执行一次完整的健康检查"""
        with self.lock:
            logger.info("开始完整健康检查周期")
          
            # 检查每个隧道
            for tunnel_name, config in self.tunnels.items():
                try:
                    # 执行健康检查
                    result = self.check_tunnel_health(config)
                  
                    # 保存到历史记录
                    self.history[tunnel_name].append(result)
                    # 只保留最近100条记录
                    if len(self.history[tunnel_name]) > 100:
                        self.history[tunnel_name] = self.history[tunnel_name][-100:]
                  
                    # 更新OSPF cost
                    self.update_ospf_cost(tunnel_name, result.recommended_cost)
                  
                    # 记录结果
                    self.save_check_result(result)
                  
                except Exception as e:
                    logger.error(f"检查隧道 {tunnel_name} 时出错: {str(e)}")
          
            logger.info("健康检查周期完成")
  
    def save_check_result(self, result: HealthCheckResult):
        """保存检查结果到文件(用于监控)"""
        result_dict = {
            "timestamp": result.timestamp.isoformat(),
            "tunnel": result.tunnel_name,
            "latency_ms": result.latency_ms,
            "jitter_ms": result.jitter_ms,
            "packet_loss_percent": result.packet_loss_percent,
            "tcp_connect_time_ms": result.tcp_connect_time_ms,
            "http_response_time_ms": result.http_response_time_ms,
            "overall_score": result.overall_score,
            "recommended_cost": result.recommended_cost,
            "current_cost": self.current_costs.get(result.tunnel_name)
        }
      
        # 保存到JSON文件
        result_file = f"/var/lib/health_checker/results_{result.tunnel_name}.json"
        with open(result_file, "a") as f:
            f.write(json.dumps(result_dict) + "\n")
      
        # 生成Prometheus格式的指标
        metrics_file = f"/var/lib/node_exporter/tunnel_metrics_{result.tunnel_name}.prom"
        metrics = f"""# HELP tunnel_health_score 隧道健康评分 (0-100)
# TYPE tunnel_health_score gauge
tunnel_health_score{{tunnel="{result.tunnel_name}"}} {result.overall_score}

# HELP tunnel_latency_ms 隧道延迟(毫秒)
# TYPE tunnel_latency_ms gauge
tunnel_latency_ms{{tunnel="{result.tunnel_name}"}} {result.latency_ms}

# HELP tunnel_jitter_ms 隧道抖动(毫秒)
# TYPE tunnel_jitter_ms gauge
tunnel_jitter_ms{{tunnel="{result.tunnel_name}"}} {result.jitter_ms}

# HELP tunnel_packet_loss_percent 隧道丢包率(百分比)
# TYPE tunnel_packet_loss_percent gauge
tunnel_packet_loss_percent{{tunnel="{result.tunnel_name}"}} {result.packet_loss_percent}

# HELP tunnel_ospf_cost 隧道OSPF Cost值
# TYPE tunnel_ospf_cost gauge
tunnel_ospf_cost{{tunnel="{result.tunnel_name}"}} {self.current_costs.get(result.tunnel_name, 0)}
"""
      
        with open(metrics_file, "w") as f:
            f.write(metrics)
  
    def run_continuously(self, interval_seconds: int = 30):
        """持续运行健康检查"""
        logger.info(f"启动健康检查守护进程,间隔: {interval_seconds}秒")
      
        while True:
            try:
                self.run_single_check()
                time.sleep(interval_seconds)
            except KeyboardInterrupt:
                logger.info("收到中断信号,停止健康检查")
                break
            except Exception as e:
                logger.error(f"健康检查循环出错: {str(e)}")
                time.sleep(10)  # 出错后等待10秒再重试

def main():
    import argparse
  
    parser = argparse.ArgumentParser(description="隧道健康检查系统")
    parser.add_argument("--interval", type=int, default=30,
                       help="检查间隔(秒)")
    parser.add_argument("--once", action="store_true",
                       help="只执行一次检查")
    parser.add_argument("--debug", action="store_true",
                       help="启用调试模式")
  
    args = parser.parse_args()
  
    if args.debug:
        logger.setLevel(logging.DEBUG)
  
    # 创建必要的目录
    subprocess.run("mkdir -p /var/lib/health_checker /var/lib/node_exporter", 
                   shell=True)
  
    checker = TunnelHealthChecker()
  
    if args.once:
        checker.run_single_check()
    else:
        checker.run_continuously(args.interval)

if __name__ == "__main__":
    main()

8.2 健康检查服务化

脚本 8.2.1:健康检查systemd服务

# /etc/systemd/system/health-checker.service
[Unit]
Description=Tunnel Health Checker
After=network-online.target frr.service
Wants=network-online.target
Requires=frr.service

[Service]
Type=simple
User=root
ExecStart=/usr/local/bin/health_checker.py --interval 30
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal

# 安全配置
PrivateTmp=true
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/var/lib/health_checker /var/lib/node_exporter /var/log

[Install]
WantedBy=multi-user.target

脚本 8.2.2:健康检查API服务

#!/usr/bin/env python3
# /usr/local/bin/health_check_api.py
from flask import Flask, jsonify, request
import json
from datetime import datetime
import threading
from health_checker import TunnelHealthChecker

app = Flask(__name__)
checker = TunnelHealthChecker()
lock = threading.Lock()

@app.route('/api/v1/health/status', methods=['GET'])
def get_health_status():
    """获取所有隧道健康状态"""
    with lock:
        status = {}
        for tunnel_name in checker.tunnels:
            if checker.history.get(tunnel_name):
                latest = checker.history[tunnel_name][-1]
                status[tunnel_name] = {
                    "score": latest.overall_score,
                    "latency_ms": latest.latency_ms,
                    "packet_loss_percent": latest.packet_loss_percent,
                    "current_cost": checker.current_costs.get(tunnel_name),
                    "last_check": latest.timestamp.isoformat()
                }
      
        return jsonify({
            "timestamp": datetime.now().isoformat(),
            "status": status
        })

@app.route('/api/v1/health/check', methods=['POST'])
def trigger_health_check():
    """触发立即健康检查"""
    with lock:
        checker.run_single_check()
      
        # 获取最新结果
        result = {}
        for tunnel_name in checker.tunnels:
            if checker.history.get(tunnel_name):
                latest = checker.history[tunnel_name][-1]
                result[tunnel_name] = {
                    "score": latest.overall_score,
                    "recommended_cost": latest.recommended_cost,
                    "current_cost": checker.current_costs.get(tunnel_name)
                }
      
        return jsonify({
            "message": "Health check completed",
            "result": result,
            "timestamp": datetime.now().isoformat()
        })

@app.route('/api/v1/health/history/<tunnel_name>', methods=['GET'])
def get_health_history(tunnel_name):
    """获取隧道历史数据"""
    limit = request.args.get('limit', default=100, type=int)
  
    if tunnel_name not in checker.history:
        return jsonify({"error": "Tunnel not found"}), 404
  
    history = checker.history[tunnel_name][-limit:]
  
    return jsonify({
        "tunnel": tunnel_name,
        "history": [
            {
                "timestamp": h.timestamp.isoformat(),
                "score": h.overall_score,
                "latency_ms": h.latency_ms,
                "packet_loss_percent": h.packet_loss_percent,
                "recommended_cost": h.recommended_cost
            }
            for h in history
        ]
    })

@app.route('/api/v1/health/metrics', methods=['GET'])
def get_prometheus_metrics():
    """提供Prometheus格式的指标"""
    metrics = []
  
    for tunnel_name in checker.tunnels:
        if checker.history.get(tunnel_name):
            latest = checker.history[tunnel_name][-1]
            current_cost = checker.current_costs.get(tunnel_name, 0)
          
            metrics.extend([
                f'tunnel_health_score{{tunnel="{tunnel_name}"}} {latest.overall_score}',
                f'tunnel_latency_ms{{tunnel="{tunnel_name}"}} {latest.latency_ms}',
                f'tunnel_packet_loss_percent{{tunnel="{tunnel_name}"}} {latest.packet_loss_percent}',
                f'tunnel_ospf_cost{{tunnel="{tunnel_name}"}} {current_cost}',
                f'tunnel_last_check{{tunnel="{tunnel_name}"}} {int(latest.timestamp.timestamp())}'
            ])
  
    return '\n'.join(metrics), 200, {'Content-Type': 'text/plain'}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=False)

第11-12天:监控告警系统增强

11.1 Prometheus配置

配置文件 11.1.1:Prometheus监控配置

# /etc/prometheus/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          # - alertmanager:9093

rule_files:
  - "/etc/prometheus/alerts.yml"

scrape_configs:
  # Node Exporter (HZ1, LA1, LA2)
  - job_name: 'node_exporter'
    static_configs:
      - targets:
        - 'hz1:9100'
        - 'la1:9100'
        - 'la2:9100'
    metrics_path: /metrics
    scrape_interval: 30s

  # 隧道健康检查API
  - job_name: 'tunnel_health'
    static_configs:
      - targets: ['hz1:8080']
    metrics_path: /api/v1/health/metrics
    scrape_interval: 30s

  # FRR监控
  - job_name: 'frr_exporter'
    static_configs:
      - targets: ['hz1:9344']  # frr_exporter端口
    scrape_interval: 30s

  # 黑盒监控(外部探测)
  - job_name: 'blackbox_tunnel'
    metrics_path: /probe
    params:
      module: [tcp_connect]
    static_configs:
      - targets:
        - '10.100.1.2:51820'  # LA1 WireGuard端口
        - '10.100.2.2:51820'  # LA2 WireGuard端口
        - '8.8.8.8:53'        # 通过隧道访问
    relabel_configs:
      - source_labels: [__address__]
        target_label: __param_target
      - source_labels: [__param_target]
        target_label: instance
      - target_label: __address__
        replacement: localhost:9115  # Blackbox Exporter地址

11.2 告警规则配置

配置文件 11.2.1:告警规则

# /etc/prometheus/alerts.yml
groups:
  - name: tunnel_alerts
    rules:
      # 隧道完全中断告警
      - alert: TunnelDown
        expr: tunnel_health_score == 0 or tunnel_health_score < 30
        for: 1m
        labels:
          severity: critical
          component: tunnel
        annotations:
          summary: "隧道 {{ $labels.tunnel }} 完全中断"
          description: "隧道 {{ $labels.tunnel }} 健康评分为 {{ $value }},低于30分,可能完全中断"

      # 隧道质量劣化告警
      - alert: TunnelDegraded
        expr: tunnel_health_score < 60
        for: 2m
        labels:
          severity: warning
          component: tunnel
        annotations:
          summary: "隧道 {{ $labels.tunnel }} 质量劣化"
          description: "隧道 {{ $labels.tunnel }} 健康评分为 {{ $value }},低于60分"

      # 高延迟告警
      - alert: HighLatency
        expr: tunnel_latency_ms > 200
        for: 3m
        labels:
          severity: warning
          component: tunnel
        annotations:
          summary: "隧道 {{ $labels.tunnel }} 延迟过高"
          description: "隧道 {{ $labels.tunnel }} 延迟为 {{ $value }}ms,超过200ms阈值"

      # 高丢包告警
      - alert: HighPacketLoss
        expr: tunnel_packet_loss_percent > 10
        for: 2m
        labels:
          severity: warning
          component: tunnel
        annotations:
          summary: "隧道 {{ $labels.tunnel }} 丢包过高"
          description: "隧道 {{ $labels.tunnel }} 丢包率为 {{ $value }}%,超过10%阈值"

      # OSPF路由切换告警
      - alert: OSPFCostChanged
        expr: |
          abs(
            delta(tunnel_ospf_cost[5m])
          ) > 50
        for: 0m
        labels:
          severity: info
          component: routing
        annotations:
          summary: "隧道 {{ $labels.tunnel }} OSPF Cost发生变化"
          description: "隧道 {{ $labels.tunnel }} OSPF Cost在5分钟内变化 {{ $value }}"

      # 所有隧道同时故障(灾难性)
      - alert: AllTunnelsDown
        expr: |
          sum(tunnel_health_score < 30) == 2
        for: 30s
        labels:
          severity: critical
          component: infrastructure
        annotations:
          summary: "所有跨境隧道中断"
          description: "LA1和LA2隧道同时故障,跨境网络完全中断"

      # 节点资源告警
      - alert: HighCPUUsage
        expr: |
          100 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
        for: 5m
        labels:
          severity: warning
          component: system
        annotations:
          summary: "{{ $labels.instance }} CPU使用率过高"
          description: "{{ $labels.instance }} CPU使用率为 {{ $value }}%"

      # 内存使用告警
      - alert: HighMemoryUsage
        expr: |
          (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100 > 85
        for: 5m
        labels:
          severity: warning
          component: system
        annotations:
          summary: "{{ $labels.instance }} 内存使用率过高"
          description: "{{ $labels.instance }} 内存使用率为 {{ $value }}%"

11.3 Grafana仪表板配置

脚本 11.3.1:Grafana仪表板导入脚本

#!/bin/bash
# setup_grafana_dashboard.sh

# 创建隧道监控仪表板JSON文件
cat > /etc/grafana/dashboards/tunnel_monitoring.json << 'EOF'
{
  "dashboard": {
    "title": "跨境隧道监控",
    "tags": ["tunnel", "network", "monitoring"],
    "timezone": "browser",
    "panels": [
      {
        "title": "隧道健康评分",
        "type": "graph",
        "targets": [
          {
            "expr": "tunnel_health_score",
            "legendFormat": "{{tunnel}}"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
      },
      {
        "title": "隧道延迟(ms)",
        "type": "graph",
        "targets": [
          {
            "expr": "tunnel_latency_ms",
            "legendFormat": "{{tunnel}}"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
      },
      {
        "title": "隧道丢包率(%)",
        "type": "graph",
        "targets": [
          {
            "expr": "tunnel_packet_loss_percent",
            "legendFormat": "{{tunnel}}"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8}
      },
      {
        "title": "OSPF Cost值",
        "type": "graph",
        "targets": [
          {
            "expr": "tunnel_ospf_cost",
            "legendFormat": "{{tunnel}}"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8}
      },
      {
        "title": "隧道流量(入)",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(node_network_receive_bytes_total{device=~\"wg.*\"}[5m]) * 8",
            "legendFormat": "{{device}}"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 16}
      },
      {
        "title": "隧道流量(出)",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(node_network_transmit_bytes_total{device=~\"wg.*\"}[5m]) * 8",
            "legendFormat": "{{device}}"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 16}
      },
      {
        "title": "当前状态面板",
        "type": "stat",
        "targets": [
          {
            "expr": "tunnel_health_score",
            "instant": true,
            "legendFormat": "{{tunnel}}"
          }
        ],
        "gridPos": {"h": 8, "w": 24, "x": 0, "y": 24}
      }
    ],
    "schemaVersion": 27,
    "version": 1
  },
  "overwrite": true
}
EOF

# 通过Grafana API导入仪表板
GRAFANA_URL="http://localhost:3000"
API_KEY="your_grafana_api_key"

curl -X POST \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $API_KEY" \
  -d @/etc/grafana/dashboards/tunnel_monitoring.json \
  "$GRAFANA_URL/api/dashboards/db"

第13-14天:故障切换模拟测试

13.1 故障注入测试工具

脚本 13.1.1:故障注入测试框架

#!/usr/bin/env python3
# fault_injection_test.py
import subprocess
import time
import json
import random
from datetime import datetime
import threading
from enum import Enum

class FaultType(Enum):
    """故障类型枚举"""
    TUNNEL_DOWN = "tunnel_down"           # 隧道完全中断
    HIGH_LATENCY = "high_latency"         # 高延迟
    PACKET_LOSS = "packet_loss"           # 高丢包
    BANDWIDTH_LIMIT = "bandwidth_limit"   # 带宽限制
    INTERFACE_DOWN = "interface_down"     # 接口关闭
    ROUTE_FLAP = "route_flap"             # 路由震荡

class FaultInjector:
    def __init__(self):
        self.active_faults = {}
        self.test_scenarios = self.load_scenarios()
      
    def load_scenarios(self):
        """加载测试场景"""
        return [
            {
                "name": "主隧道故障切换",
                "description": "模拟LA1完全故障,验证自动切换到LA2",
                "steps": [
                    {"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 300},
                    {"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 0}  # 恢复
                ]
            },
            {
                "name": "主隧道质量劣化",
                "description": "模拟LA1高延迟高丢包,验证部分切换",
                "steps": [
                    {"type": FaultType.HIGH_LATENCY, "target": "la1", "duration": 180},
                    {"type": FaultType.PACKET_LOSS, "target": "la1", "duration": 180},
                    {"type": FaultType.HIGH_LATENCY, "target": "la1", "duration": 0},
                    {"type": FaultType.PACKET_LOSS, "target": "la1", "duration": 0}
                ]
            },
            {
                "name": "双隧道交替故障",
                "description": "模拟双隧道交替故障,验证稳定性",
                "steps": [
                    {"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 60},
                    {"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 0},
                    {"type": FaultType.TUNNEL_DOWN, "target": "la2", "duration": 60},
                    {"type": FaultType.TUNNEL_DOWN, "target": "la2", "duration": 0}
                ]
            },
            {
                "name": "路由震荡测试",
                "description": "模拟路由频繁切换,验证防震荡机制",
                "steps": [
                    {"type": FaultType.ROUTE_FLAP, "target": "la1", "duration": 120}
                ]
            }
        ]
  
    def inject_fault(self, fault_type: FaultType, target: str, duration: int, parameters: dict = None):
        """注入故障"""
        fault_id = f"{fault_type.value}_{target}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
      
        if fault_type == FaultType.TUNNEL_DOWN:
            self.inject_tunnel_down(target, duration)
        elif fault_type == FaultType.HIGH_LATENCY:
            latency = parameters.get("latency_ms", 300) if parameters else 300
            self.inject_high_latency(target, latency, duration)
        elif fault_type == FaultType.PACKET_LOSS:
            loss = parameters.get("loss_percent", 30) if parameters else 30
            self.inject_packet_loss(target, loss, duration)
        elif fault_type == FaultType.BANDWIDTH_LIMIT:
            bandwidth = parameters.get("bandwidth_mbps", 10) if parameters else 10
            self.inject_bandwidth_limit(target, bandwidth, duration)
        elif fault_type == FaultType.INTERFACE_DOWN:
            self.inject_interface_down(target, duration)
        elif fault_type == FaultType.ROUTE_FLAP:
            interval = parameters.get("interval_seconds", 10) if parameters else 10
            self.inject_route_flap(target, interval, duration)
      
        self.active_faults[fault_id] = {
            "type": fault_type.value,
            "target": target,
            "start_time": datetime.now(),
            "duration": duration,
            "parameters": parameters or {}
        }
      
        print(f"注入故障: {fault_id}")
      
        # 如果是临时故障,启动恢复定时器
        if duration > 0:
            threading.Timer(duration, self.recover_fault, args=[fault_id]).start()
      
        return fault_id
  
    def inject_tunnel_down(self, target: str, duration: int):
        """注入隧道中断故障"""
        if target == "la1":
            # 停止WireGuard服务
            subprocess.run(f"systemctl stop wg-quick@wg0", shell=True)
        elif target == "la2":
            subprocess.run(f"systemctl stop wg-quick@wg1", shell=True)
  
    def inject_high_latency(self, target: str, latency_ms: int, duration: int):
        """注入高延迟故障(使用tc)"""
        if target == "la1":
            iface = "wg0"
            gateway = "10.100.1.2"
        else:
            iface = "wg1"
            gateway = "10.100.2.2"
      
        # 使用tc添加延迟
        base_latency = 50  # 基础延迟
        add_latency = max(0, latency_ms - base_latency)
      
        cmd = f"""
        tc qdisc add dev {iface} root netem delay {add_latency}ms 10ms distribution normal
        """
        subprocess.run(cmd, shell=True)
  
    def inject_packet_loss(self, target: str, loss_percent: int, duration: int):
        """注入丢包故障"""
        if target == "la1":
            iface = "wg0"
        else:
            iface = "wg1"
      
        cmd = f"""
        tc qdisc add dev {iface} root netem loss {loss_percent}%
        """
        subprocess.run(cmd, shell=True)
  
    def inject_bandwidth_limit(self, target: str, bandwidth_mbps: int, duration: int):
        """注入带宽限制"""
        if target == "la1":
            iface = "wg0"
        else:
            iface = "wg1"
      
        rate = bandwidth_mbps * 1000  # 转换为kbit
      
        cmd = f"""
        tc qdisc add dev {iface} root handle 1: htb default 10
        tc class add dev {iface} parent 1: classid 1:10 htb rate {rate}kbit ceil {rate}kbit
        """
        subprocess.run(cmd, shell=True)
  
    def inject_interface_down(self, target: str, duration: int):
        """关闭接口"""
        if target == "la1":
            iface = "wg0"
        else:
            iface = "wg1"
      
        subprocess.run(f"ip link set {iface} down", shell=True)
  
    def inject_route_flap(self, target: str, interval_seconds: int, duration: int):
        """注入路由震荡故障"""
        def flap_cycle():
            if target == "la1":
                iface = "wg0"
                cost_good = 10
                cost_bad = 65535
            else:
                iface = "wg1"
                cost_good = 50
                cost_bad = 65535
          
            start_time = time.time()
            while time.time() - start_time < duration:
                # 切换到坏状态
                subprocess.run(f"vtysh -c 'configure terminal' -c 'interface {iface}' -c 'ip ospf cost {cost_bad}' -c 'end'", 
                             shell=True)
                time.sleep(interval_seconds / 2)
              
                # 切换回好状态
                subprocess.run(f"vtysh -c 'configure terminal' -c 'interface {iface}' -c 'ip ospf cost {cost_good}' -c 'end'", 
                             shell=True)
                time.sleep(interval_seconds / 2)
      
        threading.Thread(target=flap_cycle, daemon=True).start()
  
    def recover_fault(self, fault_id: str):
        """恢复故障"""
        if fault_id not in self.active_faults:
            return
      
        fault = self.active_faults[fault_id]
        fault_type = FaultType(fault["type"])
        target = fault["target"]
      
        print(f"恢复故障: {fault_id}")
      
        if fault_type == FaultType.TUNNEL_DOWN:
            if target == "la1":
                subprocess.run(f"systemctl start wg-quick@wg0", shell=True)
            elif target == "la2":
                subprocess.run(f"systemctl start wg-quick@wg1", shell=True)
      
        elif fault_type in [FaultType.HIGH_LATENCY, FaultType.PACKET_LOSS, FaultType.BANDWIDTH_LIMIT]:
            if target == "la1":
                iface = "wg0"
            else:
                iface = "wg1"
          
            # 清除tc规则
            subprocess.run(f"tc qdisc del dev {iface} root 2>/dev/null || true", shell=True)
      
        elif fault_type == FaultType.INTERFACE_DOWN:
            if target == "la1":
                iface = "wg0"
            else:
                iface = "wg1"
          
            subprocess.run(f"ip link set {iface} up", shell=True)
      
        # 从活动故障中移除
        del self.active_faults[fault_id]
  
    def recover_all_faults(self):
        """恢复所有故障"""
        for fault_id in list(self.active_faults.keys()):
            self.recover_fault(fault_id)
  
    def run_scenario(self, scenario_name: str):
        """运行测试场景"""
        scenario = next((s for s in self.test_scenarios if s["name"] == scenario_name), None)
        if not scenario:
            print(f"场景不存在: {scenario_name}")
            return False
      
        print(f"开始运行场景: {scenario['name']}")
        print(f"描述: {scenario['description']}")
      
        results = []
      
        for i, step in enumerate(scenario["steps"]):
            print(f"\n步骤 {i+1}: {step['type'].value} -> {step['target']} ({step['duration']}秒)")
          
            # 注入故障
            fault_id = self.inject_fault(
                step["type"], 
                step["target"], 
                step["duration"]
            )
          
            # 记录注入前的状态
            pre_state = self.get_current_state()
          
            # 等待故障生效
            wait_time = min(step["duration"], 10) if step["duration"] > 0 else 5
            print(f"等待 {wait_time} 秒让故障生效...")
            time.sleep(wait_time)
          
            # 记录注入后的状态
            post_state = self.get_current_state()
          
            # 验证切换(如果适用)
            if step["type"] in [FaultType.TUNNEL_DOWN, FaultType.HIGH_LATENCY, FaultType.PACKET_LOSS]:
                verification = self.verify_failover(step["target"])
            else:
                verification = {"verified": True, "message": "无需验证"}
          
            results.append({
                "step": i+1,
                "fault_type": step["type"].value,
                "target": step["target"],
                "duration": step["duration"],
                "pre_state": pre_state,
                "post_state": post_state,
                "verification": verification
            })
          
            # 如果不是最后一步且有持续时间,等待故障持续
            if i < len(scenario["steps"]) - 1 and step["duration"] > wait_time:
                remaining = step["duration"] - wait_time
                print(f"等待故障持续 {remaining} 秒...")
                time.sleep(remaining)
      
        # 生成测试报告
        report = self.generate_report(scenario, results)
      
        print(f"\n场景完成: {scenario['name']}")
        print(f"报告已保存: {report['report_file']}")
      
        return report
  
    def get_current_state(self):
        """获取当前网络状态"""
        state = {
            "timestamp": datetime.now().isoformat(),
            "routes": subprocess.getoutput("ip route show default"),
            "wg_status": subprocess.getoutput("wg show"),
            "ospf_cost": {},
            "tunnel_health": {}
        }
      
        # 获取OSPF cost
        for target in ["la1", "la2"]:
            iface = "wg0" if target == "la1" else "wg1"
            cmd = f"vtysh -c 'show interface {iface}' | grep 'cost'"
            output = subprocess.getoutput(cmd)
            if "cost" in output:
                cost = output.split("cost")[1].split(",")[0].strip()
                state["ospf_cost"][target] = cost
      
        return state
  
    def verify_failover(self, faulty_target: str):
        """验证故障切换是否正确"""
        # 获取当前默认路由
        default_route = subprocess.getoutput("ip route show default")
      
        if faulty_target == "la1":
            # LA1故障时,默认路由应该指向LA2
            if "wg1" in default_route:
                return {"verified": True, "message": "成功切换到LA2"}
            else:
                return {"verified": False, "message": "未切换到LA2"}
        else:
            # LA2故障时,默认路由应该指向LA1
            if "wg0" in default_route:
                return {"verified": True, "message": "成功切换到LA1"}
            else:
                return {"verified": False, "message": "未切换到LA1"}
  
    def generate_report(self, scenario, results):
        """生成测试报告"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        report_file = f"/var/log/fault_test_{scenario['name']}_{timestamp}.json"
      
        report = {
            "scenario": scenario,
            "results": results,
            "summary": {
                "total_steps": len(results),
                "passed_steps": sum(1 for r in results if r["verification"]["verified"]),
                "failed_steps": sum(1 for r in results if not r["verification"]["verified"])
            },
            "timestamp": datetime.now().isoformat(),
            "hostname": subprocess.getoutput("hostname")
        }
      
        with open(report_file, "w") as f:
            json.dump(report, f, indent=2)
      
        report["report_file"] = report_file
        return report

def main():
    import argparse
  
    parser = argparse.ArgumentParser(description="故障注入测试工具")
    parser.add_argument("--scenario", help="运行指定场景")
    parser.add_argument("--list-scenarios", action="store_true", help="列出所有场景")
    parser.add_argument("--inject", help="注入指定故障,格式: type,target,duration[,param1=value1,...]")
    parser.add_argument("--recover", help="恢复指定故障ID")
    parser.add_argument("--recover-all", action="store_true", help="恢复所有故障")
    parser.add_argument("--status", action="store_true", help="显示当前状态")
  
    args = parser.parse_args()
  
    injector = FaultInjector()
  
    if args.list_scenarios:
        print("可用测试场景:")
        for scenario in injector.test_scenarios:
            print(f"  - {scenario['name']}: {scenario['description']}")
  
    elif args.scenario:
        injector.run_scenario(args.scenario)
  
    elif args.inject:
        # 解析故障参数
        parts = args.inject.split(",")
        if len(parts) < 3:
            print("格式错误,需要: type,target,duration[,param1=value1,...]")
            return
      
        fault_type = FaultType(parts[0])
        target = parts[1]
        duration = int(parts[2])
      
        # 解析额外参数
        params = {}
        if len(parts) > 3:
            for param in parts[3:]:
                if "=" in param:
                    key, value = param.split("=", 1)
                    params[key] = value
      
        injector.inject_fault(fault_type, target, duration, params)
  
    elif args.recover:
        injector.recover_fault(args.recover)
  
    elif args.recover_all:
        injector.recover_all_faults()
  
    elif args.status:
        state = injector.get_current_state()
        print(json.dumps(state, indent=2))
  
    else:
        parser.print_help()

if __name__ == "__main__":
    main()

第15天:性能基准测试与文档

15.1 性能基准测试脚本

脚本 15.1.1:综合性能测试

#!/usr/bin/env python3
# performance_benchmark.py
import subprocess
import time
import json
import statistics
from datetime import datetime
import concurrent.futures

class PerformanceBenchmark:
    def __init__(self):
        self.results = {}
        self.test_targets = [
            {"name": "google_dns", "host": "8.8.8.8", "port": 53},
            {"name": "cloudflare_dns", "host": "1.1.1.1", "port": 53},
            {"name": "google_http", "host": "www.google.com", "port": 80},
            {"name": "cloudflare_http", "host": "www.cloudflare.com", "port": 80}
        ]
  
    def run_iperf_test(self, tunnel, duration=10):
        """运行iperf带宽测试"""
        if tunnel == "la1":
            server_ip = "10.100.1.2"
            iface = "wg0"
        else:
            server_ip = "10.100.2.2"
            iface = "wg1"
      
        # 需要在LA1和LA2上启动iperf3服务器
        cmd = f"iperf3 -c {server_ip} -i 1 -t {duration} -b 100M --bind-dev {iface} -J"
      
        try:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=duration+5)
            if result.returncode == 0:
                data = json.loads(result.stdout)
                return data.get("end", {}).get("sum_received", {})
            return None
        except:
            return None
  
    def run_latency_test(self, tunnel, count=100):
        """运行延迟测试"""
        if tunnel == "la1":
            target = "10.100.1.2"
            iface = "wg0"
        else:
            target = "10.100.2.2"
            iface = "wg1"
      
        latencies = []
      
        for _ in range(count):
            start = time.time()
            try:
                subprocess.run(
                    f"ping -c 1 -W 1 -I {iface} {target}",
                    shell=True,
                    capture_output=True,
                    timeout=2
                )
                latencies.append((time.time() - start) * 1000)  # 转换为毫秒
            except:
                latencies.append(1000)  # 超时
      
        if latencies:
            return {
                "min": min(latencies),
                "max": max(latencies),
                "avg": statistics.mean(latencies),
                "std": statistics.stdev(latencies) if len(latencies) > 1 else 0,
                "loss": latencies.count(1000) / len(latencies) * 100
            }
        return None
  
    def run_throughput_test(self, tunnel, test_size_mb=100):
        """运行吞吐量测试(使用scp/wget模拟)"""
        if tunnel == "la1":
            iface = "wg0"
            test_url = "http://10.100.1.2/testfile"  # 需要在LA1上提供测试文件
        else:
            iface = "wg1"
            test_url = "http://10.100.2.2/testfile"  # 需要在LA2上提供测试文件
      
        # 使用wget下载测试文件
        start = time.time()
        try:
            cmd = f"wget -O /dev/null --bind-address={iface} {test_url} 2>&1 | tail -2"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=300)
            elapsed = time.time() - start
          
            # 解析wget输出获取速度
            lines = result.stdout.split('\n')
            for line in lines:
                if "MB/s" in line or "KB/s" in line:
                    return {"elapsed": elapsed, "output": line.strip()}
          
            return {"elapsed": elapsed, "output": "未知"}
        except:
            return None
  
    def run_concurrent_test(self, tunnel, concurrent_connections=10, duration=30):
        """运行并发连接测试"""
        results = []
      
        def single_connection_test(conn_id):
            if tunnel == "la1":
                target = "10.100.1.2"
                iface = "wg0"
            else:
                target = "10.100.2.2"
                iface = "wg1"
          
            successes = 0
            failures = 0
            start_time = time.time()
          
            while time.time() - start_time < duration:
                try:
                    cmd = f"timeout 2 curl --interface {iface} -s http://{target}/ > /dev/null"
                    subprocess.run(cmd, shell=True, check=True)
                    successes += 1
                except:
                    failures += 1
                time.sleep(0.1)
          
            return {"successes": successes, "failures": failures}
      
        with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_connections) as executor:
            futures = [executor.submit(single_connection_test, i) for i in range(concurrent_connections)]
            for future in concurrent.futures.as_completed(futures):
                results.append(future.result())
      
        total_successes = sum(r["successes"] for r in results)
        total_failures = sum(r["failures"] for r in results)
      
        return {
            "total_connections": concurrent_connections,
            "success_rate": total_successes / (total_successes + total_failures) * 100 if (total_successes + total_failures) > 0 else 0,
            "requests_per_second": total_successes / duration
        }
  
    def run_comprehensive_test(self, tunnel):
        """运行全面性能测试"""
        print(f"开始测试隧道: {tunnel}")
      
        test_results = {
            "tunnel": tunnel,
            "timestamp": datetime.now().isoformat(),
            "tests": {}
        }
      
        # 1. 延迟测试
        print("  执行延迟测试...")
        latency_result = self.run_latency_test(tunnel, 50)
        test_results["tests"]["latency"] = latency_result
      
        # 2. 带宽测试
        print("  执行带宽测试...")
        bandwidth_result = self.run_iperf_test(tunnel, 15)
        test_results["tests"]["bandwidth"] = bandwidth_result
      
        # 3. 吞吐量测试
        print("  执行吞吐量测试...")
        throughput_result = self.run_throughput_test(tunnel, 50)
        test_results["tests"]["throughput"] = throughput_result
      
        # 4. 并发测试
        print("  执行并发测试...")
        concurrent_result = self.run_concurrent_test(tunnel, 20, 60)
        test_results["tests"]["concurrent"] = concurrent_result
      
        # 5. 路由收敛测试
        print("  执行路由收敛测试...")
        convergence_result = self.test_route_convergence(tunnel)
        test_results["tests"]["convergence"] = convergence_result
      
        return test_results
  
    def test_route_convergence(self, tunnel_to_fail):
        """测试路由收敛时间"""
        if tunnel_to_fail == "la1":
            iface = "wg0"
            other_tunnel = "la2"
        else:
            iface = "wg1"
            other_tunnel = "la1"
      
        # 记录当前路由
        original_route = subprocess.getoutput("ip route show default")
        print(f"  原始路由: {original_route}")
      
        # 注入故障
        print(f"  注入故障到 {tunnel_to_fail}...")
        subprocess.run(f"ip link set {iface} down", shell=True)
      
        # 开始计时
        start_time = time.time()
      
        # 监控路由变化
        convergence_time = None
        for i in range(30):  # 最多等待30秒
            time.sleep(1)
            current_route = subprocess.getoutput("ip route show default")
          
            if iface not in current_route:
                convergence_time = time.time() - start_time
                print(f"  路由收敛完成,耗时: {convergence_time:.2f}秒")
                break
      
        # 恢复故障
        subprocess.run(f"ip link set {iface} up", shell=True)
      
        # 等待恢复
        time.sleep(5)
      
        # 验证恢复
        final_route = subprocess.getoutput("ip route show default")
      
        return {
            "convergence_time_seconds": convergence_time,
            "original_route": original_route,
            "final_route": final_route,
            "success": convergence_time is not None and convergence_time < 10
        }
  
    def compare_tunnels(self):
        """比较两个隧道的性能"""
        print("开始双隧道性能对比测试")
        print("="*60)
      
        results = {}
      
        # 分别测试两个隧道
        for tunnel in ["la1", "la2"]:
            print(f"\n测试隧道: {tunnel}")
            results[tunnel] = self.run_comprehensive_test(tunnel)
      
        # 生成对比报告
        comparison = self.generate_comparison_report(results)
      
        # 保存结果
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        report_file = f"/var/log/performance_comparison_{timestamp}.json"
      
        with open(report_file, "w") as f:
            json.dump({
                "results": results,
                "comparison": comparison,
                "timestamp": datetime.now().isoformat()
            }, f, indent=2)
      
        print(f"\n对比报告已保存: {report_file}")
      
        # 打印简要对比
        print("\n" + "="*60)
        print("性能对比摘要:")
        print(f"{'指标':<20} {'LA1':<15} {'LA2':<15} {'优胜者':<10}")
        print("-"*60)
      
        metrics = comparison.get("metrics", {})
        for metric, values in metrics.items():
            la1_val = values.get("la1", 0)
            la2_val = values.get("la2", 0)
          
            if "latency" in metric:
                winner = "LA1" if la1_val < la2_val else "LA2" if la2_val < la1_val else "平局"
            elif "loss" in metric or "error" in metric:
                winner = "LA1" if la1_val < la2_val else "LA2" if la2_val < la1_val else "平局"
            else:  # 带宽、吞吐量等
                winner = "LA1" if la1_val > la2_val else "LA2" if la2_val > la1_val else "平局"
          
            print(f"{metric:<20} {la1_val:<15.2f} {la2_val:<15.2f} {winner:<10}")
      
        return results
  
    def generate_comparison_report(self, results):
        """生成对比报告"""
        comparison = {
            "metrics": {},
            "recommendation": ""
        }
      
        # 提取关键指标
        for tunnel in ["la1", "la2"]:
            tunnel_results = results.get(tunnel, {}).get("tests", {})
          
            # 延迟
            latency = tunnel_results.get("latency", {})
            comparison["metrics"][f"{tunnel}_avg_latency_ms"] = latency.get("avg", 0)
            comparison["metrics"][f"{tunnel}_packet_loss_percent"] = latency.get("loss", 100)
          
            # 带宽
            bandwidth = tunnel_results.get("bandwidth", {})
            if bandwidth:
                comparison["metrics"][f"{tunnel}_bandwidth_mbps"] = bandwidth.get("bits_per_second", 0) / 1e6
          
            # 并发性能
            concurrent = tunnel_results.get("concurrent", {})
            comparison["metrics"][f"{tunnel}_success_rate_percent"] = concurrent.get("success_rate", 0)
            comparison["metrics"][f"{tunnel}_requests_per_second"] = concurrent.get("requests_per_second", 0)
          
            # 收敛时间
            convergence = tunnel_results.get("convergence", {})
            comparison["metrics"][f"{tunnel}_convergence_time_seconds"] = convergence.get("convergence_time_seconds", 30)
      
        # 计算综合评分
        la1_score = self.calculate_overall_score(comparison, "la1")
        la2_score = self.calculate_overall_score(comparison, "la2")
      
        comparison["overall_scores"] = {
            "la1": la1_score,
            "la2": la2_score
        }
      
        # 给出推荐
        if la1_score > la2_score + 10:
            comparison["recommendation"] = "推荐使用LA1作为主隧道,LA2作为备用"
        elif la2_score > la1_score + 10:
            comparison["recommendation"] = "推荐使用LA2作为主隧道,LA1作为备用"
        else:
            comparison["recommendation"] = "两个隧道性能相当,建议负载均衡"
      
        return comparison
  
    def calculate_overall_score(self, comparison, tunnel):
        """计算综合评分"""
        score = 0
      
        # 延迟评分(越低越好)
        latency = comparison["metrics"].get(f"{tunnel}_avg_latency_ms", 300)
        if latency < 100:
            score += 30
        elif latency < 200:
            score += 20
        elif latency < 300:
            score += 10
      
        # 丢包评分(越低越好)
        loss = comparison["metrics"].get(f"{tunnel}_packet_loss_percent", 100)
        if loss < 1:
            score += 25
        elif loss < 5:
            score += 20
        elif loss < 10:
            score += 10
      
        # 带宽评分(越高越好)
        bandwidth = comparison["metrics"].get(f"{tunnel}_bandwidth_mbps", 0)
        if bandwidth > 500:
            score += 25
        elif bandwidth > 200:
            score += 20
        elif bandwidth > 100:
            score += 15
        elif bandwidth > 50:
            score += 10
      
        # 成功率评分(越高越好)
        success_rate = comparison["metrics"].get(f"{tunnel}_success_rate_percent", 0)
        if success_rate > 99:
            score += 20
        elif success_rate > 95:
            score += 15
        elif success_rate > 90:
            score += 10
      
        return score

def main():
    import argparse
  
    parser = argparse.ArgumentParser(description="性能基准测试工具")
    parser.add_argument("--tunnel", choices=["la1", "la2", "both"], default="both",
                       help="测试的隧道")
    parser.add_argument("--test", choices=["latency", "bandwidth", "concurrent", "all"],
                       default="all", help="测试类型")
    parser.add_argument("--duration", type=int, default=30,
                       help="测试持续时间(秒)")
    parser.add_argument("--output", help="输出文件路径")
  
    args = parser.parse_args()
  
    benchmark = PerformanceBenchmark()
  
    if args.tunnel == "both":
        results = benchmark.compare_tunnels()
    else:
        if args.test == "all":
            results = benchmark.run_comprehensive_test(args.tunnel)
        elif args.test == "latency":
            results = benchmark.run_latency_test(args.tunnel, 50)
        elif args.test == "bandwidth":
            results = benchmark.run_iperf_test(args.tunnel, args.duration)
        elif args.test == "concurrent":
            results = benchmark.run_concurrent_test(args.tunnel, 20, args.duration)
      
        print(json.dumps(results, indent=2))
      
        if args.output:
            with open(args.output, "w") as f:
                json.dump(results, f, indent=2)

if __name__ == "__main__":
    main()

第三阶段:运维优化与生产就绪实施方案

目标周期:10个工作日
完成标准:监控完善、文档完整、演练成功

第16-17天:监控体系完善

16.1 高级监控与可视化

脚本 16.1.1:高级监控仪表板配置

{
  "dashboard": {
    "title": "跨境SD-WAN高级监控",
    "tags": ["production", "network", "sdwan"],
    "panels": [
      {
        "title": "隧道健康状态矩阵",
        "type": "table",
        "targets": [{
          "expr": "tunnel_health_score",
          "instant": true,
          "format": "table"
        }],
        "gridPos": {"h": 6, "w": 24, "x": 0, "y": 0}
      },
      {
        "title": "链路质量热力图",
        "type": "heatmap",
        "targets": [{
          "expr": "rate(node_network_receive_bytes_total{device=~\"wg.*\"}[5m]) * 8",
          "legendFormat": "{{device}}"
        }],
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 6}
      },
      {
        "title": "OSPF路由状态",
        "type": "stat",
        "targets": [{
          "expr": "frr_ospf_neighbor_state",
          "instant": true,
          "legendFormat": "{{neighbor}}"
        }],
        "gridPos": {"h": 4, "w": 12, "x": 12, "y": 6}
      }
    ]
  }
}

16.2 自动化巡检脚本

脚本 16.2.1:每日自动化巡检

#!/usr/bin/env python3
# daily_inspection.py
import subprocess
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import socket

class DailyInspector:
    def __init__(self):
        self.hostname = socket.gethostname()
        self.inspection_time = datetime.now()
        self.results = {
            "timestamp": self.inspection_time.isoformat(),
            "hostname": self.hostname,
            "checks": {},
            "summary": {"total": 0, "passed": 0, "failed": 0, "warnings": 0}
        }
  
    def run_check(self, name, check_func):
        """运行单个检查"""
        try:
            result = check_func()
            self.results["checks"][name] = result
            self.results["summary"]["total"] += 1
          
            if result["status"] == "PASS":
                self.results["summary"]["passed"] += 1
            elif result["status"] == "FAIL":
                self.results["summary"]["failed"] += 1
            else:
                self.results["summary"]["warnings"] += 1
          
            return result
        except Exception as e:
            error_result = {
                "status": "ERROR",
                "message": f"检查执行失败: {str(e)}",
                "details": None
            }
            self.results["checks"][name] = error_result
            self.results["summary"]["total"] += 1
            self.results["summary"]["failed"] += 1
            return error_result
  
    def check_system_health(self):
        """检查系统健康状态"""
        checks = {}
      
        # 1. 系统负载
        load = subprocess.getoutput("cat /proc/loadavg").split()[0:3]
        checks["system_load"] = {
            "value": load,
            "threshold": [2.0, 4.0, 8.0],  # 1min, 5min, 15min阈值
            "status": "PASS" if float(load[0]) < 2.0 else "WARNING"
        }
      
        # 2. 内存使用
        mem_info = subprocess.getoutput("free -m").split('\n')[1].split()
        mem_total = int(mem_info[1])
        mem_used = int(mem_info[2])
        mem_usage = (mem_used / mem_total) * 100
        checks["memory_usage"] = {
            "value": f"{mem_usage:.1f}%",
            "threshold": 85,
            "status": "PASS" if mem_usage < 85 else "WARNING"
        }
      
        # 3. 磁盘使用
        disk_info = subprocess.getoutput("df -h /").split('\n')[1].split()
        disk_usage = disk_info[4].replace('%', '')
        checks["disk_usage"] = {
            "value": f"{disk_usage}%",
            "threshold": 90,
            "status": "PASS" if int(disk_usage) < 90 else "WARNING"
        }
      
        return {
            "status": "PASS" if all(c["status"] == "PASS" for c in checks.values()) else "WARNING",
            "message": "系统健康检查完成",
            "details": checks
        }
  
    def check_network_services(self):
        """检查网络服务状态"""
        services = [
            "frr", "wg-quick@wg0", "wg-quick@wg1", 
            "health-checker", "xray"
        ]
      
        results = {}
        failed_services = []
      
        for service in services:
            try:
                # 检查服务状态
                cmd = f"systemctl is-active {service}"
                status = subprocess.check_output(cmd, shell=True, text=True).strip()
              
                results[service] = {
                    "status": status,
                    "active": status == "active"
                }
              
                if status != "active":
                    failed_services.append(service)
                  
            except subprocess.CalledProcessError:
                results[service] = {
                    "status": "inactive",
                    "active": False
                }
                failed_services.append(service)
      
        return {
            "status": "PASS" if not failed_services else "FAIL",
            "message": f"网络服务检查: {len(failed_services)}个服务异常" if failed_services else "所有网络服务正常",
            "details": results,
            "failed_services": failed_services
        }
  
    def check_tunnel_connectivity(self):
        """检查隧道连通性"""
        tunnels = [
            {"name": "la1", "interface": "wg0", "gateway": "10.100.1.2"},
            {"name": "la2", "interface": "wg1", "gateway": "10.100.2.2"}
        ]
      
        results = {}
        failed_tunnels = []
      
        for tunnel in tunnels:
            try:
                # 测试连通性
                cmd = f"ping -c 3 -W 1 -I {tunnel['interface']} {tunnel['gateway']}"
                output = subprocess.check_output(cmd, shell=True, text=True)
              
                # 解析ping结果
                if "0% packet loss" in output:
                    # 提取延迟
                    for line in output.split('\n'):
                        if "rtt min/avg/max/mdev" in line:
                            latency = line.split('/')[4]  # 平均延迟
                            break
                  
                    results[tunnel['name']] = {
                        "status": "up",
                        "latency_ms": float(latency),
                        "packet_loss": 0
                    }
                else:
                    results[tunnel['name']] = {
                        "status": "degraded",
                        "latency_ms": None,
                        "packet_loss": 100
                    }
                    failed_tunnels.append(tunnel['name'])
                  
            except subprocess.CalledProcessError:
                results[tunnel['name']] = {
                    "status": "down",
                    "latency_ms": None,
                    "packet_loss": 100
                }
                failed_tunnels.append(tunnel['name'])
      
        return {
            "status": "PASS" if not failed_tunnels else "FAIL",
            "message": f"隧道连通性: {len(failed_tunnels)}个隧道异常" if failed_tunnels else "所有隧道连通正常",
            "details": results,
            "failed_tunnels": failed_tunnels
        }
  
    def check_routing_status(self):
        """检查路由状态"""
        try:
            # 检查默认路由
            default_route = subprocess.getoutput("ip route show default")
          
            # 检查OSPF邻居
            ospf_neighbors = subprocess.getoutput("vtysh -c 'show ip ospf neighbor'")
          
            # 检查路由表
            route_count = subprocess.getoutput("vtysh -c 'show ip route' | wc -l")
          
            return {
                "status": "PASS",
                "message": "路由状态正常",
                "details": {
                    "default_route": default_route,
                    "ospf_neighbor_count": len([l for l in ospf_neighbors.split('\n') if 'Full' in l]),
                    "total_routes": int(route_count)
                }
            }
        except Exception as e:
            return {
                "status": "FAIL",
                "message": f"路由检查失败: {str(e)}",
                "details": None
            }
  
    def check_security_status(self):
        """检查安全状态"""
        checks = {}
      
        # 1. 检查开放的端口
        open_ports = subprocess.getoutput("ss -tlnp | grep LISTEN | wc -l")
        checks["open_ports"] = {
            "value": int(open_ports),
            "threshold": 20,
            "status": "PASS" if int(open_ports) < 20 else "WARNING"
        }
      
        # 2. 检查登录失败
        failed_logins = subprocess.getoutput("grep 'Failed password' /var/log/auth.log | wc -l")
        checks["failed_logins"] = {
            "value": int(failed_logins),
            "threshold": 10,
            "status": "PASS" if int(failed_logins) < 10 else "WARNING"
        }
      
        # 3. 检查防火墙规则
        iptables_rules = subprocess.getoutput("iptables -L -n | wc -l")
        checks["firewall_rules"] = {
            "value": int(iptables_rules),
            "status": "PASS" if int(iptables_rules) > 10 else "WARNING"  # 至少应该有10条规则
        }
      
        return {
            "status": "PASS" if all(c["status"] == "PASS" for c in checks.values()) else "WARNING",
            "message": "安全检查完成",
            "details": checks
        }
  
    def run_all_checks(self):
        """运行所有检查"""
        print(f"开始每日巡检: {self.hostname}")
        print("="*60)
      
        checks = [
            ("系统健康检查", self.check_system_health),
            ("网络服务检查", self.check_network_services),
            ("隧道连通性检查", self.check_tunnel_connectivity),
            ("路由状态检查", self.check_routing_status),
            ("安全检查", self.check_security_status)
        ]
      
        for name, func in checks:
            print(f"执行: {name}")
            self.run_check(name, func)
      
        # 生成报告
        self.generate_report()
      
        # 发送通知(如果有失败)
        if self.results["summary"]["failed"] > 0:
            self.send_alert()
      
        return self.results
  
    def generate_report(self):
        """生成巡检报告"""
        timestamp = self.inspection_time.strftime("%Y%m%d")
        report_file = f"/var/log/daily_inspection_{timestamp}.json"
      
        # 保存详细报告
        with open(report_file, "w") as f:
            json.dump(self.results, f, indent=2)
      
        # 生成HTML报告
        html_report = self.generate_html_report()
        html_file = f"/var/log/daily_inspection_{timestamp}.html"
      
        with open(html_file, "w") as f:
            f.write(html_report)
      
        # 生成简要文本报告
        text_report = self.generate_text_report()
        text_file = f"/var/log/daily_inspection_{timestamp}.txt"
      
        with open(text_file, "w") as f:
            f.write(text_report)
      
        print(f"\n巡检报告:")
        print(f"  JSON详细报告: {report_file}")
        print(f"  HTML可视化报告: {html_file}")
        print(f"  文本简要报告: {text_file}")
      
        # 打印摘要
        print(f"\n巡检摘要:")
        print(f"  总计检查: {self.results['summary']['total']}")
        print(f"  通过: {self.results['summary']['passed']} ✅")
        print(f"  失败: {self.results['summary']['failed']} ❌")
        print(f"  警告: {self.results['summary']['warnings']} ⚠️")
  
    def generate_html_report(self):
        """生成HTML格式的报告"""
        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>每日巡检报告 - {self.hostname}</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .summary {{ background: #f5f5f5; padding: 15px; border-radius: 5px; }}
                .check {{ margin: 10px 0; padding: 10px; border-left: 4px solid #ccc; }}
                .pass {{ border-color: #4CAF50; background: #e8f5e9; }}
                .fail {{ border-color: #f44336; background: #ffebee; }}
                .warning {{ border-color: #ff9800; background: #fff3e0; }}
                .details {{ margin-top: 10px; padding: 10px; background: white; border-radius: 3px; }}
                table {{ border-collapse: collapse; width: 100%; }}
                th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
                th {{ background-color: #f2f2f2; }}
            </style>
        </head>
        <body>
            <h1>每日巡检报告</h1>
            <div class="summary">
                <h2>巡检摘要</h2>
                <p><strong>时间:</strong> {self.inspection_time}</p>
                <p><strong>主机:</strong> {self.hostname}</p>
                <p><strong>状态:</strong> 
                    <span style="color: {'green' if self.results['summary']['failed'] == 0 else 'red'}">
                        {('正常' if self.results['summary']['failed'] == 0 else '异常')}
                    </span>
                </p>
                <p><strong>检查统计:</strong> 
                    总计: {self.results['summary']['total']} | 
                    通过: {self.results['summary']['passed']} | 
                    失败: {self.results['summary']['failed']} | 
                    警告: {self.results['summary']['warnings']}
                </p>
            </div>
          
            <h2>详细检查结果</h2>
        """
      
        for check_name, check_result in self.results["checks"].items():
            status_class = check_result["status"].lower()
          
            html += f"""
            <div class="check {status_class}">
                <h3>{check_name} - <span class="status">{check_result['status']}</span></h3>
                <p>{check_result['message']}</p>
            """
          
            if check_result.get("details"):
                html += """
                <div class="details">
                    <h4>详细信息:</h4>
                """
              
                if isinstance(check_result["details"], dict):
                    html += "<table>"
                    for key, value in check_result["details"].items():
                        if isinstance(value, dict):
                            value_str = json.dumps(value, indent=2)
                        else:
                            value_str = str(value)
                      
                        html += f"<tr><th>{key}</th><td>{value_str}</td></tr>"
                    html += "</table>"
                else:
                    html += f"<pre>{check_result['details']}</pre>"
              
                html += "</div>"
          
            html += "</div>"
      
        html += """
        </body>
        </html>
        """
      
        return html
  
    def generate_text_report(self):
        """生成文本格式的报告"""
        text = f"""
        每日巡检报告
        ================
      
        时间: {self.inspection_time}
        主机: {self.hostname}
        状态: {'正常' if self.results['summary']['failed'] == 0 else '异常'}
      
        检查统计:
          总计: {self.results['summary']['total']}
          通过: {self.results['summary']['passed']} ✅
          失败: {self.results['summary']['failed']} ❌
          警告: {self.results['summary']['warnings']} ⚠️
      
        详细结果:
        """
      
        for check_name, check_result in self.results["checks"].items():
            status_icon = "✅" if check_result["status"] == "PASS" else "❌" if check_result["status"] == "FAIL" else "⚠️"
            text += f"\n{status_icon} {check_name}: {check_result['message']}"
          
            if check_result.get("failed_services"):
                text += f"\n   失败服务: {', '.join(check_result['failed_services'])}"
            if check_result.get("failed_tunnels"):
                text += f"\n   失败隧道: {', '.join(check_result['failed_tunnels'])}"
      
        return text
  
    def send_alert(self):
        """发送告警通知"""
        # 这里可以集成邮件、钉钉、Slack等通知渠道
        print("\n发现失败检查,发送告警通知...")
      
        # 示例:发送邮件
        try:
            self.send_email_alert()
            print("邮件告警发送成功")
        except Exception as e:
            print(f"邮件发送失败: {str(e)}")
  
    def send_email_alert(self):
        """发送邮件告警"""
        # 配置邮件服务器
        smtp_server = "smtp.example.com"
        smtp_port = 587
        smtp_user = "[email protected]"
        smtp_password = "password"
      
        # 邮件内容
        subject = f"[告警] {self.hostname} 每日巡检发现异常"
      
        text_content = self.generate_text_report()
        html_content = self.generate_html_report()
      
        # 创建邮件
        msg = MIMEMultipart("alternative")
        msg["Subject"] = subject
        msg["From"] = smtp_user
        msg["To"] = "[email protected]"
      
        # 添加文本和HTML版本
        part1 = MIMEText(text_content, "plain")
        part2 = MIMEText(html_content, "html")
        msg.attach(part1)
        msg.attach(part2)
      
        # 发送邮件
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(smtp_user, smtp_password)
            server.send_message(msg)

def main():
    inspector = DailyInspector()
    results = inspector.run_all_checks()
  
    # 根据检查结果决定退出码
    if results["summary"]["failed"] > 0:
        exit(1)
    else:
        exit(0)

if __name__ == "__main__":
    main()

16.3 自动化巡检调度

配置文件 16.3.1:每日巡检的systemd定时器

# /etc/systemd/system/daily-inspection.service
[Unit]
Description=Daily Network Inspection
After=network-online.target
Wants=network-online.target

[Service]
Type=oneshot
User=root
ExecStart=/usr/local/bin/daily_inspection.py
StandardOutput=journal
.....

第18-19天:日志聚合与备份策略(续)

18.3 日志分析工具配置

脚本 18.3.1:ELK Stack日志分析配置

#!/bin/bash
# setup_elk_logging.sh
# 在日志服务器上执行

set -e

echo "=== 配置ELK Stack日志分析 ==="

# 1. 安装Elasticsearch
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update
apt install -y elasticsearch

# 2. 配置Elasticsearch
cat > /etc/elasticsearch/elasticsearch.yml << 'EOF'
cluster.name: tunnel-logging
node.name: ${HOSTNAME}
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
discovery.type: single-node
xpack.security.enabled: false
EOF

# 3. 安装Logstash
apt install -y logstash

# 4. 配置Logstash管道
mkdir -p /etc/logstash/conf.d

cat > /etc/logstash/conf.d/tunnel-logs.conf << 'EOF'
input {
  tcp {
    port => 514
    type => "syslog"
  }
  udp {
    port => 514
    type => "syslog"
  }
}

filter {
  # 解析syslog格式
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
    }
  
    # 添加时间戳
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  
    # 分类处理不同类型的日志
    if [syslog_program] =~ /wg-quick/ {
      mutate {
        add_field => { "[@metadata][index]" => "wireguard-logs" }
        add_tag => ["wireguard", "tunnel"]
      }
    }
  
    if [syslog_program] =~ /frr/ {
      mutate {
        add_field => { "[@metadata][index]" => "frr-logs" }
        add_tag => ["frr", "routing"]
      }
    
      # 解析FRR特定日志
      grok {
        match => { "syslog_message" => "%{DATA:frr_component}: %{GREEDYDATA:frr_message}" }
      }
    }
  
    if [syslog_program] =~ /health-checker/ {
      mutate {
        add_field => { "[@metadata][index]" => "health-check-logs" }
        add_tag => ["health-check", "monitoring"]
      }
    
      # 解析健康检查日志
      grok {
        match => { "syslog_message" => "隧道 %{DATA:tunnel} 检查完成: 评分=%{NUMBER:score}, 推荐cost=%{NUMBER:recommended_cost}" }
      }
    }
  
    # 添加地理位置信息
    if [syslog_hostname] =~ /hz/ {
      mutate {
        add_field => { "location" => "hangzhou" }
      }
    } elsif [syslog_hostname] =~ /la/ {
      mutate {
        add_field => { "location" => "los_angeles" }
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index]}-%{+YYYY.MM.dd}"
  }

  # 同时输出到控制台用于调试
  stdout { codec => rubydebug }
}
EOF

# 5. 安装Kibana
apt install -y kibana

# 6. 配置Kibana
cat > /etc/kibana/kibana.yml << 'EOF'
server.port: 5601
server.host: "0.0.0.0"
server.name: "tunnel-kibana"
elasticsearch.hosts: ["http://localhost:9200"]
elasticsearch.requestTimeout: 30000
kibana.index: ".kibana"
logging.dest: /var/log/kibana/kibana.log
EOF

# 7. 启动服务
systemctl daemon-reload
systemctl enable elasticsearch logstash kibana
systemctl start elasticsearch
sleep 30  # 等待Elasticsearch启动
systemctl start logstash kibana

# 8. 创建索引模板
curl -X PUT "localhost:9200/_template/tunnel-logs" -H 'Content-Type: application/json' -d'
{
  "index_patterns": ["*-logs-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "refresh_interval": "5s"
  },
  "mappings": {
    "properties": {
      "@timestamp": {"type": "date"},
      "syslog_hostname": {"type": "keyword"},
      "syslog_program": {"type": "keyword"},
      "location": {"type": "keyword"},
      "tunnel": {"type": "keyword"},
      "score": {"type": "float"},
      "recommended_cost": {"type": "integer"},
      "message": {"type": "text"}
    }
  }
}
'

# 9. 创建Kibana仪表板
cat > /tmp/kibana-dashboard.json << 'EOF'
{
  "objects": [
    {
      "id": "tunnel-overview-dashboard",
      "type": "dashboard",
      "attributes": {
        "title": "隧道日志概览",
        "hits": 0,
        "description": "",
        "panelsJSON": "[{\"version\":\"7.14.0\",\"panelIndex\":\"1\",\"gridData\":{\"x\":0,\"y\":0,\"w\":24,\"h\":15,\"i\":\"1\"},\"panelRefName\":\"panel_1\"}]",
        "optionsJSON": "{\"darkTheme\":false}",
        "version": 1
      },
      "references": [
        {
          "name": "panel_1",
          "type": "visualization",
          "id": "tunnel-logs-overview"
        }
      ]
    }
  ]
}
'

echo "=== ELK Stack配置完成 ==="
echo "访问地址:"
echo "  Kibana: http://$(hostname -I | awk '{print $1}'):5601"
echo "  Elasticsearch: http://$(hostname -I | awk '{print $1}'):9200"

18.4 备份验证与恢复演练

脚本 18.4.1:备份恢复测试脚本

#!/usr/bin/env python3
# backup_recovery_test.py
import subprocess
import os
import shutil
import time
import json
from datetime import datetime, timedelta

class BackupRecoveryTester:
    def __init__(self):
        self.backup_dir = "/backup/tunnel-config"
        self.test_dir = "/tmp/backup_test"
        self.results = []
      
    def cleanup_test_dir(self):
        """清理测试目录"""
        if os.path.exists(self.test_dir):
            shutil.rmtree(self.test_dir)
        os.makedirs(self.test_dir, exist_ok=True)
  
    def find_latest_backup(self):
        """查找最新的备份文件"""
        daily_dir = os.path.join(self.backup_dir, "daily")
        if not os.path.exists(daily_dir):
            return None
      
        backup_files = []
        for root, dirs, files in os.walk(daily_dir):
            for file in files:
                if file.endswith(".tar.gz"):
                    backup_files.append(os.path.join(root, file))
      
        if not backup_files:
            return None
      
        # 按修改时间排序
        backup_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
        return backup_files[0]
  
    def verify_backup_integrity(self, backup_file):
        """验证备份文件完整性"""
        print(f"验证备份文件完整性: {backup_file}")
      
        # 1. 检查文件存在
        if not os.path.exists(backup_file):
            return {"status": "FAIL", "message": "备份文件不存在"}
      
        # 2. 检查文件大小
        file_size = os.path.getsize(backup_file)
        if file_size < 1024:  # 小于1KB的备份文件可能有问题
            return {"status": "WARNING", "message": f"备份文件过小: {file_size}字节"}
      
        # 3. 验证tar.gz格式
        try:
            cmd = f"tar -tzf {backup_file} | head -5"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            if result.returncode != 0:
                return {"status": "FAIL", "message": "备份文件格式错误"}
        except:
            return {"status": "FAIL", "message": "无法读取备份文件"}
      
        # 4. 提取并验证关键文件
        try:
            # 创建临时目录
            temp_dir = "/tmp/backup_verify"
            os.makedirs(temp_dir, exist_ok=True)
          
            # 解压备份
            cmd = f"tar -xzf {backup_file} -C {temp_dir}"
            subprocess.run(cmd, shell=True, check=True)
          
            # 检查关键文件是否存在
            critical_files = [
                "etc/wireguard/wg0.conf",
                "etc/frr/frr.conf",
                "etc/iptables/rules.v4"
            ]
          
            missing_files = []
            for file in critical_files:
                if not os.path.exists(os.path.join(temp_dir, file)):
                    missing_files.append(file)
          
            # 清理临时目录
            shutil.rmtree(temp_dir)
          
            if missing_files:
                return {"status": "WARNING", "message": f"缺少关键文件: {', '.join(missing_files)}"}
          
            return {"status": "PASS", "message": "备份文件完整性验证通过", "size_mb": file_size/(1024*1024)}
          
        except Exception as e:
            return {"status": "FAIL", "message": f"备份验证过程中出错: {str(e)}"}
  
    def test_backup_restoration(self, backup_file):
        """测试备份恢复"""
        print(f"测试备份恢复: {backup_file}")
      
        self.cleanup_test_dir()
      
        try:
            # 1. 解压备份到测试目录
            cmd = f"tar -xzf {backup_file} -C {self.test_dir}"
            subprocess.run(cmd, shell=True, check=True)
          
            # 2. 验证配置文件语法
            config_checks = [
                {
                    "name": "WireGuard配置",
                    "file": "etc/wireguard/wg0.conf",
                    "check_cmd": "wg-quick strip"
                },
                {
                    "name": "FRR配置",
                    "file": "etc/frr/frr.conf",
                    "check_cmd": "vtysh -C -f"
                },
                {
                    "name": "iptables规则",
                    "file": "etc/iptables/rules.v4",
                    "check_cmd": "iptables-restore -t"
                }
            ]
          
            check_results = []
            for check in config_checks:
                file_path = os.path.join(self.test_dir, check["file"])
              
                if os.path.exists(file_path):
                    cmd = f"cat {file_path} | {check['check_cmd']} 2>&1"
                    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
                  
                    if result.returncode == 0:
                        check_results.append({
                            "name": check["name"],
                            "status": "PASS",
                            "message": "配置文件语法正确"
                        })
                    else:
                        check_results.append({
                            "name": check["name"],
                            "status": "WARNING",
                            "message": f"配置文件可能有语法问题: {result.stderr[:100]}"
                        })
                else:
                    check_results.append({
                        "name": check["name"],
                        "status": "WARNING",
                        "message": "配置文件不存在"
                    })
          
            # 3. 模拟恢复过程
            recovery_steps = []
          
            # 停止服务
            services = ["frr", "wg-quick@wg0", "wg-quick@wg1", "xray"]
            for service in services:
                recovery_steps.append({
                    "step": f"停止 {service}",
                    "simulated": True,
                    "status": "SUCCESS"
                })
          
            # 复制配置文件
            recovery_steps.append({
                "step": "复制配置文件到系统目录",
                "simulated": True,
                "status": "SUCCESS"
            })
          
            # 启动服务
            for service in services:
                recovery_steps.append({
                    "step": f"启动 {service}",
                    "simulated": True,
                    "status": "SUCCESS"
                })
          
            # 验证恢复
            recovery_steps.append({
                "step": "验证隧道连通性",
                "simulated": True,
                "status": "SUCCESS"
            })
          
            return {
                "status": "PASS",
                "message": "备份恢复测试通过",
                "config_checks": check_results,
                "recovery_steps": recovery_steps
            }
          
        except Exception as e:
            return {
                "status": "FAIL",
                "message": f"备份恢复测试失败: {str(e)}"
            }
  
    def test_point_in_time_recovery(self, days_ago=1):
        """测试时间点恢复"""
        print(f"测试 {days_ago} 天前的备份恢复")
      
        # 查找指定天数的备份
        target_date = datetime.now() - timedelta(days=days_ago)
        date_str = target_date.strftime("%Y%m%d")
      
        daily_dir = os.path.join(self.backup_dir, "daily")
        backup_files = []
      
        for root, dirs, files in os.walk(daily_dir):
            for file in files:
                if file.endswith(".tar.gz") and date_str in file:
                    backup_files.append(os.path.join(root, file))
      
        if not backup_files:
            return {"status": "FAIL", "message": f"未找到 {days_ago} 天前的备份"}
      
        # 使用最新的匹配备份
        backup_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
        backup_file = backup_files[0]
      
        # 验证并测试恢复
        integrity_result = self.verify_backup_integrity(backup_file)
        if integrity_result["status"] != "PASS":
            return integrity_result
      
        recovery_result = self.test_backup_restoration(backup_file)
      
        return {
            "status": recovery_result["status"],
            "message": recovery_result["message"],
            "backup_file": backup_file,
            "backup_date": time.ctime(os.path.getmtime(backup_file))
        }
  
    def run_comprehensive_test(self):
        """运行全面的备份恢复测试"""
        print("开始全面的备份恢复测试")
        print("="*60)
      
        test_cases = [
            ("验证最新备份", self.verify_backup_integrity, [self.find_latest_backup()]),
            ("测试最新备份恢复", self.test_backup_restoration, [self.find_latest_backup()]),
            ("测试1天前备份恢复", self.test_point_in_time_recovery, [1]),
            ("测试7天前备份恢复", self.test_point_in_time_recovery, [7])
        ]
      
        for test_name, test_func, test_args in test_cases:
            print(f"\n执行测试: {test_name}")
          
            if test_args[0] is None:
                result = {"status": "FAIL", "message": "未找到备份文件"}
            else:
                result = test_func(*test_args)
          
            self.results.append({
                "test": test_name,
                "timestamp": datetime.now().isoformat(),
                "result": result
            })
          
            status_icon = "✅" if result["status"] == "PASS" else "⚠️" if result["status"] == "WARNING" else "❌"
            print(f"  结果: {status_icon} {result['status']} - {result['message']}")
      
        # 生成测试报告
        self.generate_test_report()
      
        return self.results
  
    def generate_test_report(self):
        """生成测试报告"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        report_file = f"/var/log/backup_recovery_test_{timestamp}.json"
      
        report = {
            "timestamp": datetime.now().isoformat(),
            "hostname": subprocess.getoutput("hostname"),
            "backup_dir": self.backup_dir,
            "results": self.results,
            "summary": {
                "total_tests": len(self.results),
                "passed": sum(1 for r in self.results if r["result"]["status"] == "PASS"),
                "warnings": sum(1 for r in self.results if r["result"]["status"] == "WARNING"),
                "failed": sum(1 for r in self.results if r["result"]["status"] == "FAIL")
            }
        }
      
        with open(report_file, "w") as f:
            json.dump(report, f, indent=2)
      
        # 生成简要报告
        summary_file = f"/var/log/backup_recovery_summary_{timestamp}.txt"
        with open(summary_file, "w") as f:
            f.write("备份恢复测试报告\n")
            f.write("="*60 + "\n")
            f.write(f"测试时间: {report['timestamp']}\n")
            f.write(f"主机: {report['hostname']}\n")
            f.write(f"备份目录: {report['backup_dir']}\n")
            f.write(f"测试总数: {report['summary']['total_tests']}\n")
            f.write(f"通过: {report['summary']['passed']}\n")
            f.write(f"警告: {report['summary']['warnings']}\n")
            f.write(f"失败: {report['summary']['failed']}\n\n")
          
            f.write("详细结果:\n")
            for result in report["results"]:
                status = result["result"]["status"]
                icon = "✓" if status == "PASS" else "!" if status == "WARNING" else "✗"
                f.write(f"{icon} {result['test']}: {result['result']['message']}\n")
      
        print(f"\n测试报告:")
        print(f"  详细报告: {report_file}")
        print(f"  简要报告: {summary_file}")
      
        # 总结
        print(f"\n测试摘要:")
        print(f"  总计: {report['summary']['total_tests']} 项测试")
        print(f"  通过: {report['summary']['passed']} ✅")
        print(f"  警告: {report['summary']['warnings']} ⚠️")
        print(f"  失败: {report['summary']['failed']} ❌")

def main():
    import argparse
  
    parser = argparse.ArgumentParser(description="备份恢复测试工具")
    parser.add_argument("--test", choices=["integrity", "recovery", "point-in-time", "full"], 
                       default="full", help="测试类型")
    parser.add_argument("--backup-file", help="指定备份文件路径")
    parser.add_argument("--days-ago", type=int, default=1, help="恢复几天前的备份")
    parser.add_argument("--output", help="输出报告路径")
  
    args = parser.parse_args()
  
    tester = BackupRecoveryTester()
  
    if args.test == "integrity":
        backup_file = args.backup_file or tester.find_latest_backup()
        result = tester.verify_backup_integrity(backup_file)
        print(json.dumps(result, indent=2))
      
    elif args.test == "recovery":
        backup_file = args.backup_file or tester.find_latest_backup()
        result = tester.test_backup_restoration(backup_file)
        print(json.dumps(result, indent=2))
      
    elif args.test == "point-in-time":
        result = tester.test_point_in_time_recovery(args.days_ago)
        print(json.dumps(result, indent=2))
      
    else:  # full
        results = tester.run_comprehensive_test()
      
        # 如果有输出路径,保存结果
        if args.output:
            with open(args.output, "w") as f:
                json.dump(results, f, indent=2)
      
        # 根据测试结果决定退出码
        if any(r["result"]["status"] == "FAIL" for r in results):
            exit(1)
        elif any(r["result"]["status"] == "WARNING" for r in results):
            exit(2)
        else:
            exit(0)

if __name__ == "__main__":
    main()

第20天:文档与知识转移(续)

20.4 自动化文档生成与发布

脚本 20.4.1:自动化文档流水线

#!/usr/bin/env python3
# documentation_pipeline.py
import os
import json
import yaml
import markdown
from datetime import datetime
import subprocess
import shutil
from pathlib import Path

class DocumentationPipeline:
    def __init__(self):
        self.project_root = "/opt/tunnel-project"
        self.docs_dir = os.path.join(self.project_root, "docs")
        self.build_dir = os.path.join(self.project_root, "docs-build")
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
      
        # 创建目录结构
        self.create_directory_structure()
  
    def create_directory_structure(self):
        """创建文档目录结构"""
        dirs = [
            self.docs_dir,
            self.build_dir,
            os.path.join(self.docs_dir, "architecture"),
            os.path.join(self.docs_dir, "operations"),
            os.path.join(self.docs_dir, "api"),
            os.path.join(self.docs_dir, "troubleshooting"),
            os.path.join(self.docs_dir, "training"),
            os.path.join(self.build_dir, "html"),
            os.path.join(self.build_dir, "pdf"),
            os.path.join(self.build_dir, "json")
        ]
      
        for dir_path in dirs:
            os.makedirs(dir_path, exist_ok=True)
  
    def gather_system_documentation(self):
        """收集系统文档"""
        print("收集系统文档...")
      
        docs = {
            "system_info": self.get_system_info(),
            "network_config": self.get_network_config(),
            "service_config": self.get_service_config(),
            "monitoring_config": self.get_monitoring_config(),
            "backup_config": self.get_backup_config()
        }
      
        # 保存为JSON
        json_file = os.path.join(self.build_dir, "json", "system_docs.json")
        with open(json_file, "w") as f:
            json.dump(docs, f, indent=2, ensure_ascii=False)
      
        return docs
  
    def get_system_info(self):
        """获取系统信息"""
        info = {
            "timestamp": self.timestamp,
            "hostname": subprocess.getoutput("hostname"),
            "os": {
                "distribution": subprocess.getoutput("lsb_release -d | cut -f2"),
                "kernel": subprocess.getoutput("uname -r"),
                "architecture": subprocess.getoutput("uname -m")
            },
            "hardware": {
                "cpu": subprocess.getoutput("lscpu | grep 'Model name' | cut -d: -f2").strip(),
                "memory_gb": subprocess.getoutput("free -g | awk '/^Mem:/ {print $2}'"),
                "disk_gb": subprocess.getoutput("df -h / | awk 'NR==2 {print $2}'")
            },
            "network": {
                "interfaces": [],
                "routing_tables": []
            }
        }
      
        # 网络接口信息
        interfaces = subprocess.getoutput("ip -o link show | awk -F': ' '{print $2}'").split('\n')
        for iface in interfaces:
            if iface and iface != "lo":
                addr_info = subprocess.getoutput(f"ip addr show {iface}")
                info["network"]["interfaces"].append({
                    "name": iface,
                    "info": addr_info
                })
      
        return info
  
    def get_network_config(self):
        """获取网络配置"""
        config = {
            "wireguard": {},
            "routing": {},
            "firewall": {}
        }
      
        # WireGuard配置
        wg_configs = ["/etc/wireguard/wg0.conf", "/etc/wireguard2/wg1.conf"]
        for conf_file in wg_configs:
            if os.path.exists(conf_file):
                with open(conf_file, "r") as f:
                    config["wireguard"][os.path.basename(conf_file)] = f.read()
      
        # FRR配置
        frr_files = ["/etc/frr/frr.conf", "/etc/frr/daemons"]
        for conf_file in frr_files:
            if os.path.exists(conf_file):
                with open(conf_file, "r") as f:
                    config["routing"][os.path.basename(conf_file)] = f.read()
      
        # iptables配置
        iptables_files = ["/etc/iptables/rules.v4", "/etc/iptables/rules.v6"]
        for conf_file in iptables_files:
            if os.path.exists(conf_file):
                with open(conf_file, "r") as f:
                    config["firewall"][os.path.basename(conf_file)] = f.read()
      
        return config
  
    def get_service_config(self):
        """获取服务配置"""
        services = {
            "systemd_services": [],
            "cron_jobs": [],
            "custom_scripts": []
        }
      
        # Systemd服务
        tunnel_services = [
            "frr.service",
            "[email protected]",
            "[email protected]",
            "health-checker.service",
            "xray.service",
            "daily-inspection.service"
        ]
      
        for service in tunnel_services:
            service_file = f"/etc/systemd/system/{service}"
            if os.path.exists(service_file):
                with open(service_file, "r") as f:
                    services["systemd_services"].append({
                        "name": service,
                        "content": f.read()
                    })
      
        # Cron作业
        cron_file = "/etc/cron.d/tunnel-backup"
        if os.path.exists(cron_file):
            with open(cron_file, "r") as f:
                services["cron_jobs"] = f.read().split('\n')
      
        # 自定义脚本
        scripts_dir = "/usr/local/bin"
        tunnel_scripts = ["tunnel_switch.py", "health_checker.py", "daily_inspection.py"]
        for script in tunnel_scripts:
            script_file = os.path.join(scripts_dir, script)
            if os.path.exists(script_file):
                with open(script_file, "r") as f:
                    # 只取前100行
                    content = '\n'.join(f.readlines()[:100])
                    services["custom_scripts"].append({
                        "name": script,
                        "content": content + "\n... (truncated)"
                    })
      
        return services
  
    def get_monitoring_config(self):
        """获取监控配置"""
        monitoring = {
            "prometheus": {},
            "grafana": {},
            "alerting": {}
        }
      
        # Prometheus配置
        prometheus_files = ["/etc/prometheus/prometheus.yml", "/etc/prometheus/alerts.yml"]
        for conf_file in prometheus_files:
            if os.path.exists(conf_file):
                with open(conf_file, "r") as f:
                    monitoring["prometheus"][os.path.basename(conf_file)] = f.read()
      
        return monitoring
  
    def get_backup_config(self):
        """获取备份配置"""
        backup = {
            "scripts": [],
            "directories": []
        }
      
        # 备份脚本
        backup_scripts = ["/usr/local/bin/backup_tunnel_config.sh", "/usr/local/bin/rotate_backups.sh"]
        for script in backup_scripts:
            if os.path.exists(script):
                with open(script, "r") as f:
                    backup["scripts"].append({
                        "name": os.path.basename(script),
                        "content": f.read()
                    })
      
        # 备份目录
        backup_dir = "/backup/tunnel-config"
        if os.path.exists(backup_dir):
            for root, dirs, files in os.walk(backup_dir):
                for file in files:
                    if file.endswith(".tar.gz"):
                        file_path = os.path.join(root, file)
                        backup["directories"].append({
                            "path": file_path,
                            "size_mb": os.path.getsize(file_path) / (1024*1024),
                            "modified": datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
                        })
      
        return backup
  
    def generate_markdown_docs(self, system_docs):
        """生成Markdown文档"""
        print("生成Markdown文档...")
      
        # 1. 架构文档
        arch_doc = self.generate_architecture_doc(system_docs)
        arch_file = os.path.join(self.docs_dir, "architecture", "overview.md")
        with open(arch_file, "w") as f:
            f.write(arch_doc)
      
        # 2. 运维手册
        ops_doc = self.generate_operations_doc(system_docs)
        ops_file = os.path.join(self.docs_dir, "operations", "manual.md")
        with open(ops_file, "w") as f:
            f.write(ops_doc)
      
        # 3. 故障排查指南
        troubleshooting_doc = self.generate_troubleshooting_doc()
        troubleshooting_file = os.path.join(self.docs_dir, "troubleshooting", "guide.md")
        with open(troubleshooting_file, "w") as f:
            f.write(troubleshooting_doc)
      
        # 4. 培训材料
        training_doc = self.generate_training_materials()
        training_file = os.path.join(self.docs_dir, "training", "materials.md")
        with open(training_file, "w") as f:
            f.write(training_doc)
      
        # 5. 生成主README
        readme_doc = self.generate_readme()
        readme_file = os.path.join(self.docs_dir, "README.md")
        with open(readme_file, "w") as f:
            f.write(readme_doc)
  
    def generate_architecture_doc(self, system_docs):
        """生成架构文档"""
        doc = f"""# 跨境SD-WAN架构文档

> 文档生成时间: {self.timestamp}
> 主机: {system_docs['system_info']['hostname']}

## 1. 系统架构概览

### 1.1 物理拓扑

杭州侧 美国侧
────────────────────────────────────────────────────

杭州业务VPS (10.10.0.0/16)
|
|
HZ1 (10.10.1.1) ── FRR OSPF Area 0
| ├── WireGuard Tunnel1 (10.100.1.0/30) ────── LA1 (10.8.1.0/24)
| └── WireGuard Tunnel2 (10.100.2.0/30) ────── LA2 (10.8.2.0/24)
|
杭州内网OSPF Client


### 1.2 逻辑架构

#### 数据平面
- **传输层**: VLESS+REALITY over TCP
- **隧道层**: WireGuard over UDP
- **路由层**: OSPF动态路由

#### 控制平面
- **健康检查**: 定期评估隧道质量
- **路由决策**: 基于健康评分调整OSPF cost
- **故障切换**: 自动路径切换

#### 管理平面
- **监控**: Prometheus + Grafana
- **日志**: ELK Stack
- **备份**: 自动化配置备份

## 2. 网络配置

### 2.1 IP地址规划

| 网络段 | 用途 | 备注 |
|--------|------|------|
| 10.10.0.0/16 | 杭州内网 | 业务VPS网段 |
| 10.100.1.0/30 | HZ1 ↔ LA1隧道 | 点对点链路 |
| 10.100.2.0/30 | HZ1 ↔ LA2隧道 | 点对点链路 |
| 10.8.1.0/24 | LA1侧网络 | 美国业务网络A |
| 10.8.2.0/24 | LA2侧网络 | 美国业务网络B |

### 2.2 路由策略

- **主路径**: LA1 (初始cost=10)
- **备用路径**: LA2 (初始cost=50)
- **切换条件**: 基于健康评分动态调整cost

## 3. 系统组件

### 3.1 核心组件

| 组件 | 版本 | 功能 |
|------|------|------|
| WireGuard | {subprocess.getoutput('wg --version')} | 加密隧道 |
| FRR | {subprocess.getoutput('vtysh --version')} | 动态路由 |
| Xray | {subprocess.getoutput('/usr/local/xray/xray --version')} | 代理传输 |

### 3.2 辅助组件

| 组件 | 功能 |
|------|------|
| Prometheus | 监控指标收集 |
| Grafana | 监控可视化 |
| ELK Stack | 日志分析 |
| 自定义健康检查 | 链路质量评估 |

## 4. 性能指标

### 4.1 设计目标

| 指标 | 目标值 |
|------|--------|
| 故障切换时间 | ≤10秒 |
| 路由收敛时间 | ≤5秒 |
| 系统可用性 | ≥99.9% |
| 隧道延迟 | ≤200ms |
| 隧道丢包率 | ≤5% |

## 5. 安全设计

### 5.1 加密方案
- 传输加密: TLS 1.3 (REALITY)
- 隧道加密: WireGuard (ChaCha20-Poly1305)
- 身份验证: 公钥加密

### 5.2 访问控制
- 防火墙规则: iptables
- 服务白名单: 仅开放必要端口
- 日志审计: 所有操作记录

---
*文档自动生成,请勿手动修改*
"""
        return doc
  
    def generate_operations_doc(self, system_docs):
        """生成运维手册"""
        doc = f"""# 跨境SD-WAN运维手册

> 文档生成时间: {self.timestamp}

## 1. 日常运维

### 1.1 监控检查

#### 1.1.1 Grafana仪表板
- 访问地址: http://{system_docs['system_info']['hostname']}:3000
- 主要仪表板:
  - 隧道健康状态
  - 系统资源使用
  - 网络流量监控
  - 告警事件统计

#### 1.1.2 命令行监控
```bash
# 查看隧道状态
wg show

# 查看路由状态
vtysh -c "show ip ospf neighbor"
vtysh -c "show ip route ospf"

# 查看健康检查状态
systemctl status health-checker
journalctl -u health-checker -f

# 查看系统状态
/usr/local/bin/daily_inspection.py

1.2 备份管理

1.2.1 备份位置

  • 每日备份: /backup/tunnel-config/daily/
  • 每周备份: /backup/tunnel-config/weekly/
  • 每月备份: /backup/tunnel-config/monthly/

1.2.2 备份验证

# 验证最新备份
/usr/local/bin/backup_recovery_test.py --test integrity

# 测试备份恢复
/usr/local/bin/backup_recovery_test.py --test recovery

1.2.3 手动备份

/usr/local/bin/backup_tunnel_config.sh

1.3 日志查看

1.3.1 实时日志

# WireGuard日志
journalctl -u wg-quick@wg0 -f
journalctl -u wg-quick@wg1 -f

# FRR日志
journalctl -u frr -f

# 健康检查日志
tail -f /var/log/health_checker.log

# 隧道切换日志
tail -f /var/log/tunnel_switch.log

1.3.2 日志分析

  • Kibana地址: http://{system_docs[‘system_info’][‘hostname’]}:5601
  • 主要索引:
    • wireguard-logs-*
    • frr-logs-*
    • health-check-logs-*

2. 配置管理

2.1 配置文件位置

配置文件 路径 描述
WireGuard (LA1) /etc/wireguard/wg0.conf LA1隧道配置
WireGuard (LA2) /etc/wireguard2/wg1.conf LA2隧道配置
FRR /etc/frr/frr.conf 路由配置
Xray /usr/local/xray/config_*.json 代理配置
健康检查 /usr/local/bin/health_checker.py 健康检查脚本
系统服务 /etc/systemd/system/*.service Systemd服务文件

2.2 配置变更流程

  1. 备份当前配置

    /usr/local/bin/backup_tunnel_config.sh
    
  2. 修改配置文件

    # 使用版本控制或编辑工具
    cp /path/to/new/config /etc/wireguard/wg0.conf
    
  3. 验证配置语法

    # WireGuard配置验证
    wg-quick strip /etc/wireguard/wg0.conf
    
    # FRR配置验证
    vtysh -C -f /etc/frr/frr.conf
    
  4. 应用配置

    # 重启服务
    systemctl restart wg-quick@wg0
    systemctl restart frr
    
  5. 验证变更

    # 检查服务状态
    systemctl status wg-quick@wg0
    systemctl status frr
    
    # 验证功能
    /usr/local/bin/tunnel_switch.py status
    

3. 故障处理

3.1 常见故障处理

3.1.1 隧道中断

症状:

  • 隧道健康评分为0
  • 监控告警触发
  • 业务访问失败

处理步骤:

  1. 检查隧道状态: wg show
  2. 检查代理服务: systemctl status xray
  3. 检查网络连通性: ping -I wg0 10.100.1.2
  4. 查看日志: journalctl -u wg-quick@wg0 --since "10 minutes ago"
  5. 手动切换到备用隧道: /usr/local/bin/tunnel_switch.py switch la2

3.1.2 路由异常

症状:

  • OSPF邻居状态异常
  • 路由表缺失
  • 流量路径错误

处理步骤:

  1. 检查FRR状态: systemctl status frr
  2. 查看OSPF邻居: vtysh -c "show ip ospf neighbor"
  3. 查看路由表: vtysh -c "show ip route"
  4. 重启FRR: systemctl restart frr
  5. 检查防火墙规则: iptables -L -n

3.1.3 健康检查异常

症状:

  • 健康检查服务停止
  • 评分不再更新
  • 自动切换失效

处理步骤:

  1. 检查服务状态: systemctl status health-checker
  2. 查看日志: journalctl -u health-checker
  3. 手动运行检查: /usr/local/bin/health_checker.py --once
  4. 检查依赖服务: systemctl status prometheus node_exporter

3.2 紧急恢复

3.2.1 回滚到单隧道模式

/usr/local/bin/rollback_to_single_tunnel.sh

3.2.2 从备份恢复

# 1. 停止所有服务
systemctl stop wg-quick@wg0 wg-quick@wg1 frr xray

# 2. 恢复最新备份
BACKUP_FILE=$(ls -t /backup/tunnel-config/daily/config_*.tar.gz | head -1)
tar -xzf $BACKUP_FILE -C /

# 3. 重启服务
systemctl start frr wg-quick@wg0 wg-quick@wg1 xray

# 4. 验证恢复
/usr/local/bin/tunnel_switch.py status

4. 性能优化

4.1 监控指标调优

4.1.1 健康检查参数

# 位置: /usr/local/bin/health_checker.py
检查间隔: 30秒
评分权重:
  延迟: 30%
  抖动: 20%
  丢包: 30%
  TCP连接: 10%
  HTTP响应: 10%

切换阈值:
  优质: ≥80分 (cost=10)
  可用: ≥60分 (cost=50)
  劣质: <60分 (cost≥100)
  故障: <30分 (cost=65535)

4.1.2 OSPF参数调优

# 进入FRR配置
vtysh
configure terminal

# 调整OSPF定时器
router ospf
timers throttle spf 10 100 5000
timers throttle lsa all 10 100 5000

4.2 系统参数优化

4.2.1 内核参数

# /etc/sysctl.d/60-tunnel-optimization.conf
net.core.rmem_max = 134217728
net.core.wmem_max = 134217728
net.ipv4.tcp_rmem = 4096 87380 134217728
net.ipv4.tcp_wmem = 4096 65536 134217728
net.ipv4.tcp_congestion_control = bbr

4.2.2 WireGuard优化

# /etc/wireguard/wg0.conf
[Interface]
# 调整MTU
MTU = 1380

# 启用多队列(如果支持)
Queue = 2

5. 扩展与升级

5.1 增加新隧道

5.1.1 准备新节点

  1. 部署新的美国VPS
  2. 配置网络和防火墙
  3. 安装必要软件

5.1.2 配置新隧道

  1. 在HZ1上生成新的WireGuard密钥对
  2. 配置新的隧道接口
  3. 更新健康检查配置
  4. 更新OSPF配置

5.1.3 验证新隧道

  1. 测试隧道连通性
  2. 验证路由学习
  3. 测试故障切换

5.2 系统升级

5.2.1 软件升级

# 系统更新
apt update && apt upgrade -y

# WireGuard升级
apt install --only-upgrade wireguard wireguard-tools

# FRR升级
apt install --only-upgrade frr frr-pythontools

5.2.2 配置迁移

  1. 备份当前配置
  2. 比较新旧配置差异
  3. 逐步应用新配置
  4. 验证功能正常

6. 联系信息

6.1 支持团队

角色 联系方式 值班时间
一线支持 +86-138-XXXX-XXXX 7x24
二线支持 +86-139-XXXX-XXXX 工作时间
架构师 +86-136-XXXX-XXXX 紧急情况

6.2 告警通知

6.3 文档更新


本手册为自动生成,定期更新,请确保使用最新版本
“”"
return doc

def generate_troubleshooting_doc(self):
    """生成故障排查指南"""
    doc = """# 故障排查指南

故障排查流程图

发现问题问题类型隧道问题路由问题性能问题监控问题检查WireGuard状态检查代理服务检查网络连通性查看相关日志尝试手动切换检查FRR状态检查OSPF邻居检查路由表检查防火墙规则检查系统负载检查带宽使用检查隧道延迟优化系统参数检查监控服务检查指标收集检查告警规则检查通知渠道

常见故障场景及解决方案

场景1: 隧道完全中断

症状

  • 隧道健康评分为0
  • wg show显示隧道断开
  • 业务完全无法访问

排查步骤

  1. 检查WireGuard状态

    # 检查接口状态
    ip link show wg0
    
    # 检查WireGuard状态
    wg show
    
    # 检查服务状态
    systemctl status wg-quick@wg0
    
  2. 检查代理服务

    # 检查Xray服务
    systemctl status xray
    
    # 检查端口监听
    netstat -tlnp | grep 10001
    
    # 检查进程状态
    ps aux | grep xray
    
  3. 检查网络连通性

    # 测试本地端口
    telnet 127.0.0.1 10001
    
    # 测试目标服务器
    curl --connect-timeout 5 http://LA1_PUBLIC_IP:443
    
    # 使用tcpdump抓包
    tcpdump -i eth0 port 443 -n
    
  4. 查看相关日志

    # WireGuard日志
    journalctl -u wg-quick@wg0 --since "1 hour ago"
    
    # Xray日志
    journalctl -u xray --since "1 hour ago"
    
    # 系统日志
    tail -f /var/log/syslog | grep -E "(wg|xray|wireguard)"
    
  5. 紧急处理

    # 手动切换到备用隧道
    /usr/local/bin/tunnel_switch.py switch la2
    
    # 重启问题服务
    systemctl restart wg-quick@wg0
    systemctl restart xray
    

根本原因分析

  1. 网络问题: 防火墙阻挡、网络中断
  2. 服务问题: 进程崩溃、配置错误
  3. 资源问题: 内存不足、连接数满

场景2: 隧道质量下降

症状

  • 延迟增加(>200ms)
  • 丢包率升高(>10%)
  • 健康评分下降(<60分)

排查步骤

  1. 诊断网络质量

    # 测试延迟和丢包
    ping -c 100 -I wg0 10.100.1.2
    
    # 使用mtr进行路径诊断
    mtr -r -c 100 -I wg0 10.100.1.2
    
    # 测试带宽
    iperf3 -c 10.100.1.2 -I wg0 -t 30
    
  2. 检查系统资源

    # CPU使用率
    top -bn1 | grep "Cpu(s)"
    
    # 内存使用
    free -h
    
    # 网络连接数
    ss -s | grep "estab"
    
    # 磁盘I/O
    iostat -x 1 5
    
  3. 检查流量统计

    # 接口流量
    ifconfig wg0
    
    # 详细流量统计
    nethogs wg0
    
    # 连接追踪
    conntrack -L | grep wg0 | wc -l
    
  4. 优化措施

    # 调整内核参数
    sysctl -w net.core.rmem_max=134217728
    sysctl -w net.core.wmem_max=134217728
    
    # 调整WireGuard参数
    wg set wg0 peer <PUBLIC_KEY> persistent-keepalive 10
    
    # 清理连接追踪表
    conntrack -F
    

根本原因分析

  1. 网络拥塞: 高峰时段带宽不足
  2. 系统瓶颈: CPU/内存/磁盘限制
  3. 配置问题: 参数未优化
  4. 应用问题: 特定应用流量异常

场景3: 路由切换频繁

症状

  • 路由表频繁变化
  • 流量在隧道间震荡
  • OSPF cost频繁调整

排查步骤

  1. 检查路由状态

    # 查看路由变化历史
    vtysh -c "show log" | grep OSPF
    
    # 监控路由表变化
    watch -n 1 'vtysh -c "show ip route ospf"'
    
    # 检查OSPF邻居状态
    vtysh -c "show ip ospf neighbor detail"
    
  2. 检查健康检查

    # 查看健康检查日志
    tail -f /var/log/health_checker.log
    
    # 手动运行健康检查
    /usr/local/bin/health_checker.py --once
    
    # 检查监控指标
    curl -s http://localhost:8080/api/v1/health/metrics | grep health_score
    
  3. 调整防震荡参数

    # 修改健康检查脚本中的参数
    # 位置: /usr/local/bin/health_checker.py
    
    # 增加切换迟滞
    COST_CHANGE_THRESHOLD = 100  # 原50
    
    # 增加检查间隔
    CHECK_INTERVAL = 60  # 原30
    
    # 增加历史记录长度
    HISTORY_LENGTH = 20  # 原10
    
  4. 调整OSPF参数

    vtysh
    configure terminal
    
    # 增加OSPF定时器
    router ospf
    timers throttle spf 30 200 10000
    timers throttle lsa all 30 200 10000
    
    # 调整接口cost变化敏感度
    interface wg0
    ip ospf cost 10
    ip ospf dead-interval 40
    

根本原因分析

  1. 网络不稳定: 链路质量波动
  2. 参数过敏感: 切换阈值过低
  3. 监控噪声: 健康检查误判
  4. 配置问题: OSPF定时器不合理

场景4: 监控告警失效

症状

  • Prometheus指标缺失
  • Grafana面板无数据
  • 告警未触发或误报

排查步骤

  1. 检查监控服务

    # 检查Prometheus
    systemctl status prometheus
    curl http://localhost:9090/-/healthy
    
    # 检查Node Exporter
    systemctl status node_exporter
    curl http://localhost:9100/metrics | head -5
    
    # 检查健康检查API
    curl http://localhost:8080/api/v1/health/status
    
  2. 检查指标收集

    # 查看Prometheus目标状态
    curl http://localhost:9090/api/v1/targets | jq .
    
    # 检查抓取配置
    grep -A 5 -B 5 "tunnel" /etc/prometheus/prometheus.yml
    
    # 手动测试指标
    curl http://localhost:9100/metrics | grep tunnel
    
  3. 检查告警规则

    # 验证告警规则语法
    promtool check rules /etc/prometheus/alerts.yml
    
    # 查看活跃告警
    curl http://localhost:9090/api/v1/alerts | jq .
    
    # 测试告警表达式
    curl -s "http://localhost:9090/api/v1/query?query=tunnel_health_score<30"
    
  4. 检查通知渠道

    # 检查Alertmanager
    systemctl status alertmanager
    
    # 查看告警历史
    grep "alert" /var/log/prometheus/alertmanager.log | tail -20
    
    # 测试告警发送
    curl -X POST http://localhost:9093/api/v1/alerts -d '[{
      "labels": {"alertname": "TestAlert", "severity": "warning"}
    }]'
    

根本原因分析

  1. 服务故障: 监控组件停止运行
  2. 配置错误: 指标路径或端口错误
  3. 网络问题: 防火墙阻挡监控流量
  4. 资源问题: 监控服务器资源不足

高级诊断工具

1. 网络诊断工具包

# 安装诊断工具
apt install -y mtr iperf3 tcpdump netcat conntrack nethogs iftop

# 综合诊断脚本
/usr/local/bin/network_diagnosis.sh

2. 性能剖析工具

# 系统性能分析
perf record -g -p $(pidof xray) -- sleep 30
perf report

# 内存分析
valgrind --tool=massif /usr/local/xray/xray

3. 日志分析工具

# 实时日志分析
tail -f /var/log/*.log | grep -E "(ERROR|WARN|FAIL)"

# 日志统计
awk '/ERROR/ {count++} END {print count}' /var/log/tunnel_switch.log

# 时间序列分析
rgrep "隧道.*评分" /var/log/health_checker.log | awk '{print $1, $2, $NF}'

紧急恢复清单

1. 一级紧急(完全中断)

  • 确认故障范围
  • 切换到备用隧道
  • 通知相关团队
  • 开始根本原因分析

2. 二级紧急(性能下降)

  • 监控性能指标
  • 优化系统参数
  • 调整流量策略
  • 计划维护窗口

3. 三级紧急(监控告警)

  • 验证告警真实性
  • 检查监控系统
  • 更新监控配置
  • 记录处理过程

联系支持

内部支持

  • 网络团队: #network-support
  • 运维团队: #ops-support
  • 架构团队: #arch-support

外部支持


本指南应定期更新,反映最新的故障处理经验
“”"
return doc

def generate_training_materials(self):
    """生成培训材料"""
    doc = """# 跨境SD-WAN培训材料

培训目标

通过本培训,学员将能够:

  1. 理解跨境SD-WAN架构和工作原理
  2. 掌握日常运维操作和监控方法
  3. 独立处理常见故障和问题
  4. 执行配置变更和系统优化
  5. 参与应急演练和恢复操作

培训大纲

模块1: 架构理解(2小时)

  • 跨境网络挑战与解决方案
  • SD-WAN架构设计原理
  • 组件功能介绍与交互
  • 数据流与控制流分析

模块2: 日常运维(3小时)

  • 监控系统使用
  • 日志分析与排查
  • 备份与恢复操作
  • 性能监控与优化

模块3: 故障处理(4小时)

  • 故障诊断方法论
  • 常见故障场景演练
  • 紧急恢复流程
  • 根本原因分析

模块4: 高级操作(3小时)

  • 配置变更管理
  • 系统扩展与升级
  • 安全加固操作
  • 性能调优实践

实操练习

练习1: 环境熟悉

# 1. 登录系统
ssh admin@hz1

# 2. 查看系统状态
/usr/local/bin/daily_inspection.py

# 3. 访问监控面板
# 打开浏览器访问: http://hz1:3000

# 4. 查看当前配置
cat /etc/wireguard/wg0.conf | head -20

练习2: 日常巡检

# 1. 检查隧道状态
wg show

# 2. 检查路由状态
vtysh -c "show ip ospf neighbor"
vtysh -c "show ip route ospf"

# 3. 检查健康检查
systemctl status health-checker
tail -f /var/log/health_checker.log

# 4. 检查监控指标
curl -s http://localhost:8080/api/v1/health/status | jq .

练习3: 故障模拟

# 1. 模拟隧道中断
systemctl stop wg-quick@wg0

# 2. 观察故障切换
watch -n 1 'ip route show default'

# 3. 查看告警触发
# 检查Prometheus和Alertmanager

# 4. 恢复隧道
systemctl start wg-quick@wg0

练习4: 配置变更

# 1. 备份当前配置
/usr/local/bin/backup_tunnel_config.sh

# 2. 修改WireGuard配置
cp /etc/wireguard/wg0.conf /etc/wireguard/wg0.conf.bak
sed -i 's/PersistentKeepalive = 25/PersistentKeepalive = 10/' /etc/wireguard/wg0.conf

# 3. 验证配置语法
wg-quick strip /etc/wireguard/wg0.conf

# 4. 应用配置
wg-quick down wg0 && wg-quick up wg0

# 5. 验证变更
wg show wg0 | grep keepalive

考核标准

理论考核(40%)

  • 架构理解: 10%
  • 工作原理: 10%
  • 监控体系: 10%
  • 安全知识: 10%

实操考核(40%)

  • 日常操作: 10%
  • 故障处理: 15%
  • 配置变更: 10%
  • 应急恢复: 5%

综合考核(20%)

  • 案例分析: 10%
  • 方案设计: 10%

评分标准

  • 优秀(≥90分): 独立承担运维工作
  • 良好(≥80分): 在指导下完成工作
  • 合格(≥70分): 需要持续培训
  • 不合格(<70分): 重新培训

培训资源

在线资源

本地资源

  • 配置参考: /opt/tunnel-project/docs/
  • 脚本工具: /usr/local/bin/
  • 日志文件: /var/log/tunnel*.log
  • 备份文件: /backup/tunnel-config/

培训环境

  • 测试环境: test-hz1, test-la1, test-la2
  • 模拟工具: /usr/local/bin/fault_injection_test.py
  • 练习脚本: /opt/training/exercises/

持续学习

每周学习

  • 周一: 架构复习(30分钟)
  • 周三: 案例分析(45分钟)
  • 周五: 技术分享(60分钟)

每月演练

  • 第一周: 故障处理演练
  • 第三周: 应急恢复演练

每季度考核

  • 理论考试(1小时)
  • 实操考核(2小时)
  • 综合评估(1小时)

联系方式

培训负责人

技术支持

  • 团队: 网络运维部
  • 频道: #tunnel-training
  • 会议: 每周三 14:00-15:00

反馈渠道


培训材料版本: 1.0,更新日期: 2024-01-01
“”"
return doc

def generate_readme(self):
    """生成主README文档"""
    doc = f"""# 跨境SD-WAN项目文档

项目文档自动生成系统 | 版本: 1.0 | 生成时间:

项目概述

跨境SD-WAN高可用改造项目旨在构建一个稳定、可靠、可扩展的跨境网络架构,通过双隧道冗余、智能路由选择和自动化运维,提升跨境业务的可用性和性能。

文档结构

docs/
├── architecture/          # 架构文档
│   ├── overview.md       # 架构概览
│   ├── design.md         # 设计文档
│   └── components.md     # 组件说明
├── operations/           # 运维文档
│   ├── manual.md        # 运维手册
│   ├── monitoring.md    # 监控指南
│   └── backup.md        # 备份策略
├── api/                 # API文档
│   ├── health-api.md    # 健康检查API
│   └── metrics-api.md   # 监控指标API
├── troubleshooting/     # 故障排查
│   ├── guide.md        # 排查指南
│   ├── scenarios.md    # 场景分析
│   └── tools.md        # 诊断工具
├── training/           # 培训材料
│   ├── materials.md    # 培训手册
│   ├── exercises.md    # 练习题目
│   └── assessment.md   # 考核标准
└── README.md           # 本文档

快速开始

1. 环境访问

# 生产环境
ssh admin@hz1-prod

# 测试环境
ssh admin@hz1-test

# 监控系统
# Grafana: http://hz1:3000
# Kibana: http://hz1:5601
# Prometheus: http://hz1:9090

2. 日常操作

# 查看系统状态
/usr/local/bin/daily_inspection.py

# 检查隧道健康
/usr/local/bin/tunnel_switch.py status

# 执行备份
/usr/local/bin/backup_tunnel_config.sh

3. 故障处理

# 查看故障指南
cat /opt/tunnel-project/docs/troubleshooting/guide.md

# 使用诊断工具
/usr/local/bin/network_diagnosis.sh

# 紧急恢复
/usr/local/bin/rollback_to_single_tunnel.sh

系统组件

组件 版本 功能 文档
WireGuard 1.0.0 加密隧道 文档
FRR 8.4 动态路由 文档
Xray 1.8.0 代理传输 文档
Prometheus 2.45.0 监控收集 文档
Grafana 9.5.0 监控可视化 文档
ELK Stack 7.17.0 日志分析 文档

监控指标

关键指标

  1. 隧道健康评分 (0-100): 综合评估隧道质量
  2. 隧道延迟 (ms): 端到端通信延迟
  3. 隧道丢包率 (%): 数据包丢失比例
  4. OSPF Cost值: 路由优先级指标
  5. 系统资源使用率: CPU、内存、磁盘、网络

告警规则

  • 隧道完全中断: 健康评分 < 30
  • 隧道质量劣化: 健康评分 < 60
  • 高延迟: 延迟 > 200ms
  • 高丢包: 丢包率 > 10%

变更管理

变更流程

  1. 创建变更请求
  2. 进行影响分析
  3. 制定回滚方案
  4. 在测试环境验证
  5. 审批通过后实施
  6. 验证变更效果
  7. 更新相关文档

变更窗口

  • 常规变更: 每周四 02:00-04:00
  • 紧急变更: 随时,需双人确认
  • 重大变更: 需提前一周申请

备份策略

备份类型

  1. 配置备份: 每日自动备份
  2. 系统备份: 每周完整备份
  3. 日志备份: 每月归档备份

保留策略

  • 每日备份: 保留30天
  • 每周备份: 保留8周
  • 每月备份: 保留12个月

恢复测试

# 测试备份恢复
/usr/local/bin/backup_recovery_test.py --test full

安全策略

访问控制

  • 最小权限原则
  • 多因素认证
  • 操作审计日志
  • 定期权限审查

网络安全

  • 防火墙白名单
  • 加密通信
  • 入侵检测
  • 漏洞扫描

数据安全

  • 加密存储
  • 定期备份
  • 安全删除
  • 合规检查

支持与联系

值班安排

  • 一线支持: 7x24小时
  • 二线支持: 工作日 09:00-18:00
  • 三线支持: 紧急情况响应

联系方式

  • 值班电话: +86-138-XXXX-XXXX
  • 支持邮箱: [email protected]
  • 紧急联系人: +86-139-XXXX-XXXX

文档反馈

版本历史

版本 日期 说明 作者
1.0.0 2024-01-01 初始版本 自动化系统
1.0.1 2024-01-15 更新运维手册 张工
1.1.0 2024-02-01 增加故障指南 李工

许可证

本项目文档采用 CC BY-NC-SA 4.0 许可证。

免责声明

本文档仅供参考,实际操作请根据生产环境情况进行调整。对于因使用本文档而产生的任何损失,作者不承担任何责任。


文档自动生成,最后更新: {self.timestamp}
“”"
return doc

def build_html_docs(self):
    """构建HTML文档"""
    print("构建HTML文档...")
  
    # 安装mkdocs(如果不存在)
    try:
        import mkdocs
    except ImportError:
        subprocess.run("pip install mkdocs mkdocs-material", shell=True)
  
    # 创建mkdocs配置
    mkdocs_config = os.path.join(self.project_root, "mkdocs.yml")
    with open(mkdocs_config, "w") as f:
        f.write("""site_name: 跨境SD-WAN文档

theme:
name: material
features:
- navigation.tabs
- navigation.sections
- toc.integrate
- search.suggest
- search.highlight
palette:
primary: blue
accent: light blue

repo_name: example/tunnel-docs
repo_url: https://github.com/example/tunnel-docs

nav:

  • 首页: index.md
  • 架构设计:
    • 概览: architecture/overview.md
    • 设计原理: architecture/design.md
    • 组件说明: architecture/components.md
  • 运维手册:
    • 日常运维: operations/manual.md
    • 监控指南: operations/monitoring.md
    • 备份策略: operations/backup.md
  • 故障排查:
    • 排查指南: troubleshooting/guide.md
    • 场景分析: troubleshooting/scenarios.md
    • 诊断工具: troubleshooting/tools.md
  • API文档:
    • 健康检查API: api/health-api.md
    • 监控指标API: api/metrics-api.md
  • 培训材料:
    • 培训手册: training/materials.md
    • 练习题目: training/exercises.md
    • 考核标准: training/assessment.md

markdown_extensions:

  • admonition

  • codehilite

  • footnotes

  • meta

  • toc:
    permalink: true

  • pymdownx.arithmatex

  • pymdownx.betterem:
    smart_enable: all

  • pymdownx.caret

  • pymdownx.critic

  • pymdownx.details

  • pymdownx.emoji:
    emoji_index: !!python/name:material.extensions.emoji.twemoji
    emoji_generator: !!python/name:material.extensions.emoji.to_svg

  • pymdownx.highlight

  • pymdownx.inlinehilite

  • pymdownx.keys

  • pymdownx.magiclink

  • pymdownx.mark

  • pymdownx.smartsymbols

  • pymdownx.snippets

  • pymdownx.superfences

  • pymdownx.tabbed:
    alternate_style: true

  • pymdownx.tasklist:
    custom_checkbox: true

  • pymdownx.tilde
    “”")

    # 创建index.md
    index_file = os.path.join(self.docs_dir, "index.md")
    with open(index_file, "w") as f:
        f.write(self.generate_readme())
    
    # 构建HTML
    build_cmd = f"cd {self.project_root} && mkdocs build --site-dir {self.build_dir}/html"
    subprocess.run(build_cmd, shell=True, check=True)
    
    print(f"HTML文档构建完成: {self.build_dir}/html")
    

    def publish_docs(self):
    “”“发布文档”“”
    print(“发布文档…”)

    # 1. 打包文档
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    docs_package = f"/tmp/tunnel-docs-{timestamp}.tar.gz"
    
    subprocess.run(f"tar -czf {docs_package} -C {self.build_dir} .", shell=True)
    
    # 2. 发布到Web服务器(示例)
    web_dir = "/var/www/html/tunnel-docs"
    subprocess.run(f"rsync -avz {self.build_dir}/html/ {web_dir}/", shell=True)
    
    # 3. 发布到版本控制(示例)
    git_cmds = [
        f"cd {self.project_root}",
        "git add docs/ mkdocs.yml",
        f"git commit -m '更新文档 {timestamp}'",
        "git push origin main"
    ]
    
    subprocess.run(" && ".join(git_cmds), shell=True)
    
    print(f"文档发布完成:")
    print(f"  本地访问: file://{self.build_dir}/html/index.html")
    print(f"  Web访问: http://{subprocess.getoutput('hostname')}/tunnel-docs/")
    print(f"  文档包: {docs_package}")
    

    def run_pipeline(self):
    “”“运行完整文档流水线”“”
    print(“启动文档生成流水线”)
    print(“=”*60)

    # 1. 收集系统文档
    system_docs = self.gather_system_documentation()
    
    # 2. 生成Markdown文档
    self.generate_markdown_docs(system_docs)
    
    # 3. 构建HTML文档
    self.build_html_docs()
    
    # 4. 发布文档
    self.publish_docs()
    
    print("="*60)
    print("文档生成流水线完成")
    

def main():
import argparse

parser = argparse.ArgumentParser(description="文档生成流水线")
parser.add_argument("--step", choices=["collect", "generate", "build", "publish", "all"],
                   default="all", help="执行步骤")
parser.add_argument("--output-dir", help="输出目录")

args = parser.parse_args()

pipeline = DocumentationPipeline()

if args.output_dir:
    pipeline.project_root = args.output_dir
    pipeline.docs_dir = os.path.join(args.output_dir, "docs")
    pipeline.build_dir = os.path.join(args.output_dir, "docs-build")
    pipeline.create_directory_structure()

if args.step == "collect":
    docs = pipeline.gather_system_documentation()
    print(json.dumps(docs, indent=2, ensure_ascii=False))
  
elif args.step == "generate":
    docs = pipeline.gather_system_documentation()
    pipeline.generate_markdown_docs(docs)
  
elif args.step == "build":
    pipeline.build_html_docs()
  
elif args.step == "publish":
    pipeline.publish_docs()
  
else:  # all
    pipeline.run_pipeline()

if name == “main”:
main()


### 20.5 知识转移与交接检查表

**文档 20.5.1:知识转移检查表**
```markdown
# 跨境SD-WAN项目知识转移检查表

## 项目信息
- 项目名称: 跨境SD-WAN高可用改造项目
- 移交方: 项目实施团队
- 接收方: 运维团队
- 移交日期: _______________

## 知识转移内容

### 1. 架构理解 ✅
- [ ] 理解整体架构设计
- [ ] 掌握各组件功能和作用
- [ ] 理解数据流和控制流
- [ ] 了解关键设计决策

### 2. 系统配置 ✅
- [ ] 掌握配置文件位置和格式
- [ ] 理解配置参数含义
- [ ] 掌握配置变更流程
- [ ] 了解配置验证方法

### 3. 日常运维 ✅
- [ ] 掌握日常巡检流程
- [ ] 熟悉监控系统使用
- [ ] 掌握日志分析方法
- [ ] 了解性能监控指标

### 4. 故障处理 ✅
- [ ] 掌握故障诊断流程
- [ ] 熟悉常见故障场景
- [ ] 掌握紧急恢复操作
- [ ] 了解根本原因分析方法

### 5. 备份恢复 ✅
- [ ] 掌握备份策略
- [ ] 熟悉备份验证方法
- [ ] 掌握恢复操作流程
- [ ] 了解灾难恢复计划

### 6. 变更管理 ✅
- [ ] 掌握变更控制流程
- [ ] 熟悉变更验证方法
- [ ] 掌握回滚操作流程
- [ ] 了解变更窗口安排

### 7. 安全管理 ✅
- [ ] 掌握访问控制策略
- [ ] 熟悉安全监控方法
- [ ] 掌握漏洞管理流程
- [ ] 了解合规要求

### 8. 文档使用 ✅
- [ ] 熟悉文档结构
- [ ] 掌握文档查找方法
- [ ] 了解文档更新流程
- [ ] 熟悉培训材料使用

## 实操考核

### 1. 日常操作考核
- [ ] 完成一次完整的日常巡检
- [ ] 正确使用监控系统查看状态
- [ ] 成功执行一次配置备份
- [ ] 正确分析系统日志

### 2. 故障处理考核
- [ ] 成功诊断并处理模拟故障
- [ ] 正确执行故障切换操作
- [ ] 完成故障报告编写
- [ ] 正确分析根本原因

### 3. 变更管理考核
- [ ] 完成一次配置变更操作
- [ ] 正确验证变更效果
- [ ] 成功执行回滚操作
- [ ] 完成变更文档更新

### 4. 应急恢复考核
- [ ] 成功执行备份恢复
- [ ] 完成系统重建操作
- [ ] 验证恢复后系统功能
- [ ] 完成恢复报告编写

## 文档交接

### 1. 技术文档
- [ ] 架构设计文档
- [ ] 运维手册
- [ ] 故障排查指南
- [ ] API文档
- [ ] 培训材料

### 2. 配置文档
- [ ] 系统配置清单
- [ ] 网络拓扑图
- [ ] 部署架构图
- [ ] 数据流图

### 3. 过程文档
- [ ] 变更记录
- [ ] 故障记录
- [ ] 维护记录
- [ ] 审计日志

### 4. 管理文档
- [ ] 服务级别协议(SLA)
- [ ] 运维流程文档
- [ ] 应急预案
- [ ] 联系人清单

## 工具交接

### 1. 监控工具
- [ ] Prometheus访问权限
- [ ] Grafana访问权限
- [ ] ELK Stack访问权限
- [ ] 告警配置权限

### 2. 管理工具
- [ ] 配置管理工具
- [ ] 备份管理工具
- [ ] 日志分析工具
- [ ] 性能测试工具

### 3. 脚本工具
- [ ] 日常巡检脚本
- [ ] 故障诊断脚本
- [ ] 备份恢复脚本
- [ ] 性能测试脚本

### 4. 访问权限
- [ ] 系统登录权限
- [ ] 数据库访问权限
- [ ] 网络设备权限
- [ ] 云平台权限

## 支持交接

### 1. 支持渠道
- [ ] 一线支持联系方式
- [ ] 二线支持联系方式
- [ ] 紧急联系渠道
- [ ] 供应商联系方式

### 2. 支持流程
- [ ] 事件管理流程
- [ ] 问题管理流程
- [ ] 变更管理流程
- [ ] 知识管理流程

### 3. 支持资源
- [ ] 知识库访问
- [ ] 文档库访问
- [ ] 工具集访问
- [ ] 培训资源访问

### 4. 支持安排
- [ ] 值班安排表
- [ ] 交接班流程
- [ ] 升级流程
- [ ] 报告流程

## 移交确认

### 移交方确认
我已将上述所有内容完整移交给接收方,并确保:
- [ ] 所有文档齐全且最新
- [ ] 所有工具可正常使用
- [ ] 所有权限配置正确
- [ ] 接收方已通过考核

**移交方签字**: _________________
**日期**: _________________

### 接收方确认
我已接收上述所有内容,并确认:
- [ ] 已理解系统架构和原理
- [ ] 已掌握日常运维操作
- [ ] 已通过实操考核
- [ ] 已获得必要权限和工具

**接收方签字**: _________________
**日期**: _________________

### 项目管理方确认
我确认知识转移过程符合项目要求:
- [ ] 转移内容完整
- [ ] 转移过程规范
- [ ] 接收方具备能力
- [ ] 文档资料齐全

**项目管理方签字**: _________________
**日期**: _________________

总结

至此,我们已经完成了跨境SD-WAN高可用改造项目的全部三个阶段详细实施方案:

📋 第一阶段:手动切换双隧道(5天)

  • ✅ 建立了双隧道基础架构
  • ✅ 实现了策略路由和手动切换
  • ✅ 部署了基础监控系统
  • ✅ 创建了完整的测试和回滚方案

🚀 第二阶段:智能路由与自动切换(15天)

  • ✅ 部署了FRR动态路由系统
  • ✅ 实现了智能健康检查系统
  • ✅ 构建了自动化故障切换机制
  • ✅ 建立了全面的监控告警体系
  • ✅ 完成了故障注入和性能测试

🛡️ 第三阶段:运维优化与生产就绪(10天)

  • ✅ 建立了日志聚合和分析系统
  • ✅ 实现了自动化备份和恢复策略
  • ✅ 生成了完整的项目文档体系
  • ✅ 创建了知识转移和培训材料
  • ✅ 制定了运维流程和应急方案

🎯 核心成果

  1. 高可用架构:从单点故障到双隧道自动切换
  2. 智能路由:基于链路质量的动态路径选择
  3. 全面监控:多层次、全方位的监控体系
  4. 自动化运维:从巡检到备份的全面自动化
  5. 完整文档:从架构到运维的完整知识库
  6. 运维能力:经过培训的自主运维团队

🔧 关键脚本工具

工具类别 主要脚本 功能
隧道管理 tunnel_switch.py 隧道切换和状态管理
健康检查 health_checker.py 链路质量评估和路由调整
日常巡检 daily_inspection.py 系统健康检查和报告
备份恢复 backup_recovery_test.py 备份验证和恢复测试
故障注入 fault_injection_test.py 故障模拟和测试
性能测试 performance_benchmark.py 系统性能评估
文档生成 documentation_pipeline.py 自动化文档生成

📊 监控体系

  1. 基础监控:Node Exporter + Prometheus
  2. 业务监控:自定义健康检查指标
  3. 日志监控:ELK Stack日志分析
  4. 告警管理:Prometheus Alertmanager
  5. 可视化:Grafana仪表板

📁 文档体系

  1. 架构文档:设计原理、组件说明
  2. 运维手册:日常操作、监控指南
  3. 故障指南:排查流程、场景分析
  4. API文档:健康检查、监控接口
  5. 培训材料:课程内容、实操练习
  6. 管理文档:流程规范、应急预案

🚨 应急准备

  1. 快速恢复:一键回滚脚本
  2. 备份验证:定期恢复测试
  3. 故障演练:模拟故障场景
  4. 应急预案:分级响应流程
  5. 联系人清单:7x24支持网络

📈 持续改进

  1. 定期复盘:每月运维回顾会议
  2. 性能优化:季度性能评估和调优
  3. 安全加固:定期安全审计和漏洞修复
  4. 容量规划:基于监控数据的扩容预测
  5. 技术演进:跟踪新技术并评估引入

🎉 项目成功标准

  • ✅ 双隧道稳定运行30天无重大故障
  • ✅ 自动故障切换时间≤10秒
  • ✅ 系统可用性≥99.9%
  • ✅ 运维团队可独立处理90%以上问题
  • ✅ 文档完整度100%
  • ✅ 培训考核通过率100%

这个实施方案提供了从零到生产就绪的完整路径,每个步骤都包含详细的脚本、配置和验证方法。按照这个计划执行,可以构建一个稳定、可靠、易维护的跨境SD-WAN网络,为业务提供高质量的跨境连接服务。

0

评论区