跨境SD-WAN高可用改造项目
分阶段详细实施方案
第一阶段:手动切换双隧道实施方案
目标周期:5个工作日
完成标准:双隧道建立、策略路由配置、手动切换验证
第1天:环境准备与LA2初始化
1.1 LA2节点环境初始化
脚本 1.1.1:LA2基础环境安装脚本
#!/bin/bash
# la2_initial_setup.sh
# 在LA2节点上执行
set -e
echo "=== LA2节点初始化开始 ==="
# 1. 更新系统
apt update && apt upgrade -y
# 2. 安装基础组件
apt install -y wireguard wireguard-tools resolvconf net-tools iptables-persistent
# 3. 生成WireGuard密钥对
mkdir -p /etc/wireguard
cd /etc/wireguard
umask 077
wg genkey | tee privatekey | wg pubkey > publickey
# 4. 配置sysctl参数
cat > /etc/sysctl.d/60-wireguard.conf << EOF
net.ipv4.ip_forward=1
net.ipv6.conf.all.forwarding=1
net.core.rmem_max=134217728
net.core.wmem_max=134217728
EOF
sysctl -p /etc/sysctl.d/60-wireguard.conf
# 5. 配置防火墙规则
iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
iptables-save > /etc/iptables/rules.v4
# 6. 安装代理软件(Xray)
mkdir -p /usr/local/xray
wget -O /usr/local/xray/xray https://github.com/XTLS/Xray-core/releases/latest/download/Xray-linux-64.zip
unzip /usr/local/xray/xray -d /usr/local/xray/
chmod +x /usr/local/xray/xray
echo "=== LA2节点初始化完成 ==="
echo "WireGuard公钥: $(cat /etc/wireguard/publickey)"
1.2 LA2 WireGuard配置
配置文件 1.2.1:LA2的WireGuard配置
# /etc/wireguard/wg0.conf
[Interface]
# LA2的内网地址
Address = 10.8.2.1/24
# 监听端口
ListenPort = 51820
# 第一步生成的私钥
PrivateKey = <LA2_Private_Key>
# 添加DNS服务器
DNS = 8.8.8.8, 1.1.1.1
# 配置持久化保持连接
PostUp = iptables -A FORWARD -i %i -j ACCEPT; iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
PostDown = iptables -D FORWARD -i %i -j ACCEPT; iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE
[Peer]
# HZ1的WireGuard公钥(将在第2天填入)
PublicKey = <HZ1_WG2_Public_Key>
# 允许访问的网段:杭州内网和美国侧网络
AllowedIPs = 10.10.0.0/16, 10.100.2.0/30
# 由于HZ1无公网IP,不需要Endpoint
PersistentKeepalive = 25
第2天:HZ1侧双隧道配置
2.1 HZ1 WireGuard双接口配置
脚本 2.1.1:HZ1 WireGuard双隧道配置脚本
#!/bin/bash
# hz1_wireguard_setup.sh
# 在HZ1节点上执行
set -e
echo "=== HZ1双隧道WireGuard配置开始 ==="
# 1. 创建第二个WireGuard接口的配置目录
mkdir -p /etc/wireguard2
cd /etc/wireguard2
umask 077
# 2. 为第二个隧道生成新的密钥对
wg genkey | tee privatekey2 | wg pubkey > publickey2
echo "WireGuard2公钥: $(cat publickey2)"
# 3. 配置第一个WireGuard接口(连接LA1)
cat > /etc/wireguard/wg0.conf << EOF
[Interface]
Address = 10.100.1.1/30
PrivateKey = <HZ1_WG1_Private_Key>
ListenPort = 51821
# MTU优化
MTU = 1380
[Peer]
# LA1的公钥
PublicKey = <LA1_Public_Key>
# 本地代理转发的Endpoint(VLESS代理本地端口)
Endpoint = 127.0.0.1:10001
AllowedIPs = 10.8.1.0/24, 10.100.1.2/32
PersistentKeepalive = 25
EOF
# 4. 配置第二个WireGuard接口(连接LA2)
cat > /etc/wireguard2/wg1.conf << EOF
[Interface]
Address = 10.100.2.1/30
PrivateKey = $(cat privatekey2)
ListenPort = 51822
MTU = 1380
[Peer]
# LA2的公钥(需要填入)
PublicKey = <LA2_Public_Key>
Endpoint = 127.0.0.1:10002
AllowedIPs = 10.8.2.0/24, 10.100.2.2/32
PersistentKeepalive = 25
EOF
# 5. 启动WireGuard服务
systemctl enable wg-quick@wg0
wg-quick up wg0
# 6. 创建第二个WireGuard的服务单元文件
cat > /etc/systemd/system/[email protected] << EOF
[Unit]
Description=WireGuard via wg-quick(8) for wg1
After=network-online.target nss-lookup.target
Wants=network-online.target nss-lookup.target
PartOf=wg-quick.target
Documentation=man:wg-quick(8)
Documentation=man:wg(8)
Documentation=https://www.wireguard.com/
Documentation=https://www.wireguard.com/quickstart/
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/bin/wg-quick up wg1
ExecStop=/usr/bin/wg-quick down wg1
Environment=WG_ENDPOINT_RESOLUTION_RETRIES=infinity
[Install]
WantedBy=multi-user.target
EOF
# 7. 创建第二个WireGuard的配置文件链接
ln -s /etc/wireguard2/wg1.conf /etc/wireguard/wg1.conf
# 8. 启动第二个WireGuard服务
systemctl daemon-reload
systemctl enable wg-quick@wg1
systemctl start wg-quick@wg1
echo "=== HZ1双隧道WireGuard配置完成 ==="
2.2 双代理配置(VLESS+REALITY)
配置文件 2.2.1:HZ1双代理配置
// /usr/local/xray/config_tunnel1.json - LA1隧道代理
{
"inbounds": [
{
"port": 10001,
"protocol": "dokodemo-door",
"settings": {
"address": "10.100.1.2",
"port": 51820,
"network": "tcp,udp"
},
"tag": "tunnel1_inbound"
}
],
"outbounds": [
{
"protocol": "vless",
"settings": {
"vnext": [
{
"address": "LA1_PUBLIC_IP",
"port": 443,
"users": [
{
"id": "UUID_FOR_LA1",
"encryption": "none",
"flow": "xtls-rprx-vision"
}
]
}
]
},
"streamSettings": {
"network": "tcp",
"security": "reality",
"realitySettings": {
"dest": "cloudflare.com:443",
"serverNames": ["cloudflare.com"],
"privateKey": "REALITY_PRIVATE_KEY_LA1",
"shortIds": ["abcdef1234567890"]
}
},
"tag": "tunnel1_outbound"
}
],
"routing": {
"rules": [
{
"type": "field",
"inboundTag": ["tunnel1_inbound"],
"outboundTag": "tunnel1_outbound"
}
]
}
}
// /usr/local/xray/config_tunnel2.json - LA2隧道代理
{
"inbounds": [
{
"port": 10002,
"protocol": "dokodemo-door",
"settings": {
"address": "10.100.2.2",
"port": 51820,
"network": "tcp,udp"
},
"tag": "tunnel2_inbound"
}
],
"outbounds": [
{
"protocol": "vless",
"settings": {
"vnext": [
{
"address": "LA2_PUBLIC_IP",
"port": 443,
"users": [
{
"id": "UUID_FOR_LA2",
"encryption": "none",
"flow": "xtls-rprx-vision"
}
]
}
]
},
"streamSettings": {
"network": "tcp",
"security": "reality",
"realitySettings": {
"dest": "www.microsoft.com:443",
"serverNames": ["www.microsoft.com"],
"privateKey": "REALITY_PRIVATE_KEY_LA2",
"shortIds": ["fedcba0987654321"]
}
},
"tag": "tunnel2_outbound"
}
],
"routing": {
"rules": [
{
"type": "field",
"inboundTag": ["tunnel2_inbound"],
"outboundTag": "tunnel2_outbound"
}
]
}
}
第3天:策略路由配置
3.1 多路由表配置
脚本 3.1.1:HZ1策略路由配置脚本
#!/bin/bash
# hz1_policy_routing.sh
# 在HZ1节点上执行
set -e
echo "=== HZ1策略路由配置开始 ==="
# 1. 定义网络接口
WAN_IF="eth0" # 公网出口接口
WG1_IF="wg0" # LA1隧道接口
WG2_IF="wg1" # LA2隧道接口
# 2. 添加自定义路由表
echo "200 la1_table" >> /etc/iproute2/rt_tables
echo "201 la2_table" >> /etc/iproute2/rt_tables
echo "202 default_table" >> /etc/iproute2/rt_tables
# 3. 配置各路由表的默认路由
# LA1路由表
ip route add default via 10.100.1.2 dev $WG1_IF table la1_table
# LA2路由表
ip route add default via 10.100.2.2 dev $WG2_IF table la2_table
# 默认路由表(主路由)
ip route add default via 10.100.1.2 dev $WG1_IF
# 4. 添加策略路由规则
# 规则优先级:1000以下为系统保留,我们从2000开始
# 规则1:来自特定源IP的流量走LA2
ip rule add from 10.10.100.0/24 table la2_table priority 2000
# 规则2:来自管理网络的流量走LA2(备份路径)
ip rule add from 10.10.1.0/24 table la2_table priority 2001
# 规则3:标记的流量(可通过iptables标记)
ip rule add fwmark 100 table la1_table priority 2002
ip rule add fwmark 200 table la2_table priority 2003
# 规则4:主业务流量默认走LA1
ip rule add from 10.10.0.0/16 table la1_table priority 2004
# 5. 配置连接跟踪,确保回包走正确路径
iptables -t mangle -A PREROUTING -m conntrack --ctstate ESTABLISHED,RELATED -j CONNMARK --restore-mark
iptables -t mangle -A OUTPUT -m conntrack --ctstate ESTABLISHED,RELATED -j CONNMARK --restore-mark
# 6. 创建路由策略服务
cat > /etc/systemd/system/policy-routing.service << EOF
[Unit]
Description=Policy Routing Configuration
After=network-online.target [email protected] [email protected]
Wants=network-online.target
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/local/bin/apply-policy-routing.sh
ExecStop=/usr/local/bin/cleanup-policy-routing.sh
[Install]
WantedBy=multi-user.target
EOF
# 7. 创建应用和清理脚本
cat > /usr/local/bin/apply-policy-routing.sh << 'EOF'
#!/bin/bash
# 应用策略路由配置
# 刷新路由缓存
ip route flush cache
# 确保路由表存在
grep -q "200 la1_table" /etc/iproute2/rt_tables || echo "200 la1_table" >> /etc/iproute2/rt_tables
grep -q "201 la2_table" /etc/iproute2/rt_tables || echo "201 la2_table" >> /etc/iproute2/rt_tables
# 重新应用路由规则(防止重启后丢失)
ip route add default via 10.100.1.2 dev wg0 table la1_table 2>/dev/null || true
ip route add default via 10.100.2.2 dev wg1 table la2_table 2>/dev/null || true
EOF
cat > /usr/local/bin/cleanup-policy-routing.sh << 'EOF'
#!/bin/bash
# 清理策略路由配置
# 删除自定义路由规则
for i in $(ip rule show | grep -E "2000:|2001:|2002:|2003:|2004:" | awk -F: '{print $1}'); do
ip rule del pref $i 2>/dev/null || true
done
EOF
chmod +x /usr/local/bin/apply-policy-routing.sh /usr/local/bin/cleanup-policy-routing.sh
# 8. 启用并启动服务
systemctl daemon-reload
systemctl enable policy-routing
systemctl start policy-routing
echo "=== HZ1策略路由配置完成 ==="
3.2 流量标记与分流
脚本 3.2.1:基于应用的流量标记
#!/bin/bash
# hz1_traffic_marking.sh
# 根据应用类型标记流量
# 1. 定义应用端口列表
# 高优先级应用(走优质链路LA1)
HIGH_PRIO_PORTS="22,53,80,443,3389,5900"
# 低优先级/备份应用(可走LA2)
LOW_PRIO_PORTS="21,25,110,143,993,995"
# 2. 清除旧规则
iptables -t mangle -F
iptables -t mangle -X
# 3. 标记高优先级流量(标记为100,走LA1)
for port in $(echo $HIGH_PRIO_PORTS | tr ',' ' '); do
iptables -t mangle -A OUTPUT -p tcp --dport $port -j MARK --set-mark 100
iptables -t mangle -A OUTPUT -p udp --dport $port -j MARK --set-mark 100
iptables -t mangle -A PREROUTING -p tcp --dport $port -j MARK --set-mark 100
iptables -t mangle -A PREROUTING -p udp --dport $port -j MARK --set-mark 100
done
# 4. 标记低优先级流量(标记为200,走LA2)
for port in $(echo $LOW_PRIO_PORTS | tr ',' ' '); do
iptables -t mangle -A OUTPUT -p tcp --dport $port -j MARK --set-mark 200
iptables -t mangle -A OUTPUT -p udp --dport $port -j MARK --set-mark 200
iptables -t mangle -A PREROUTING -p tcp --dport $port -j MARK --set-mark 200
iptables -t mangle -A PREROUTING -p udp --dport $port -j MARK --set-mark 200
done
# 5. 保存iptables规则
iptables-save > /etc/iptables/rules.v4
第4天:手动切换脚本与监控
4.1 手动切换脚本
脚本 4.1.1:隧道手动切换脚本
#!/usr/bin/env python3
# /usr/local/bin/tunnel_switch.py
import subprocess
import sys
import time
import json
from datetime import datetime
class TunnelManager:
def __init__(self):
self.current_tunnel = self.detect_current_tunnel()
self.log_file = "/var/log/tunnel_switch.log"
def log(self, message):
"""记录日志"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{timestamp}] {message}"
print(log_message)
with open(self.log_file, "a") as f:
f.write(log_message + "\n")
def detect_current_tunnel(self):
"""检测当前使用的隧道"""
try:
result = subprocess.run(
"ip route show default | awk '/default/ {print $5}'",
shell=True,
capture_output=True,
text=True
)
iface = result.stdout.strip()
if iface == "wg0":
return "la1"
elif iface == "wg1":
return "la2"
else:
return "unknown"
except:
return "unknown"
def check_tunnel_health(self, tunnel):
"""检查隧道健康状况"""
if tunnel == "la1":
target_ip = "10.100.1.2"
iface = "wg0"
else:
target_ip = "10.100.2.2"
iface = "wg1"
# 检查接口状态
try:
subprocess.run(f"ip link show {iface} | grep -q 'state UP'",
shell=True, check=True)
except:
self.log(f"隧道 {tunnel} 接口 {iface} 未启动")
return False
# 检查连通性
try:
result = subprocess.run(
f"ping -c 3 -W 1 -I {iface} {target_ip}",
shell=True,
capture_output=True,
text=True
)
if result.returncode == 0:
# 解析ping结果
lines = result.stdout.split('\n')
for line in lines:
if "packet loss" in line:
loss = float(line.split('%')[0].split()[-1])
if loss < 20: # 丢包率低于20%认为健康
return True
return False
except:
return False
def switch_to_tunnel(self, target_tunnel):
"""切换到指定隧道"""
current = self.detect_current_tunnel()
if current == target_tunnel:
self.log(f"已经是 {target_tunnel} 隧道,无需切换")
return True
self.log(f"开始切换: {current} -> {target_tunnel}")
# 检查目标隧道健康状况
if not self.check_tunnel_health(target_tunnel):
self.log(f"目标隧道 {target_tunnel} 不健康,切换中止")
return False
# 执行切换
try:
if target_tunnel == "la1":
# 切换到LA1
subprocess.run("ip route replace default via 10.100.1.2 dev wg0",
shell=True, check=True)
# 更新策略路由优先级
subprocess.run("ip rule del pref 2004 2>/dev/null || true", shell=True)
subprocess.run("ip rule add from 10.10.0.0/16 table la1_table priority 2004",
shell=True, check=True)
else:
# 切换到LA2
subprocess.run("ip route replace default via 10.100.2.2 dev wg1",
shell=True, check=True)
# 更新策略路由优先级
subprocess.run("ip rule del pref 2004 2>/dev/null || true", shell=True)
subprocess.run("ip rule add from 10.10.0.0/16 table la2_table priority 2004",
shell=True, check=True)
# 刷新路由缓存
subprocess.run("ip route flush cache", shell=True)
# 验证切换
time.sleep(2)
new_tunnel = self.detect_current_tunnel()
if new_tunnel == target_tunnel:
self.log(f"切换成功: {current} -> {target_tunnel}")
# 发送通知(可选)
self.send_notification(f"隧道切换: {current} -> {target_tunnel}")
return True
else:
self.log(f"切换验证失败,当前隧道: {new_tunnel}")
return False
except subprocess.CalledProcessError as e:
self.log(f"切换失败: {str(e)}")
return False
def send_notification(self, message):
"""发送通知(可扩展为邮件、钉钉、Slack等)"""
# 这里可以实现通知逻辑
pass
def show_status(self):
"""显示当前状态"""
status = {
"current_tunnel": self.detect_current_tunnel(),
"la1_health": self.check_tunnel_health("la1"),
"la2_health": self.check_tunnel_health("la2"),
"timestamp": datetime.now().isoformat()
}
print(json.dumps(status, indent=2))
return status
def main():
if len(sys.argv) < 2:
print("使用方法:")
print(" tunnel_switch.py status # 显示状态")
print(" tunnel_switch.py switch la1 # 切换到LA1")
print(" tunnel_switch.py switch la2 # 切换到LA2")
print(" tunnel_switch.py auto # 自动选择最佳隧道")
sys.exit(1)
manager = TunnelManager()
command = sys.argv[1]
if command == "status":
manager.show_status()
elif command == "switch" and len(sys.argv) == 3:
target = sys.argv[2]
if target in ["la1", "la2"]:
success = manager.switch_to_tunnel(target)
sys.exit(0 if success else 1)
else:
print("无效的隧道名称,使用 la1 或 la2")
sys.exit(1)
elif command == "auto":
# 自动选择最佳隧道
la1_health = manager.check_tunnel_health("la1")
la2_health = manager.check_tunnel_health("la2")
if la1_health and not la2_health:
manager.switch_to_tunnel("la1")
elif la2_health and not la1_health:
manager.switch_to_tunnel("la2")
elif la1_health and la2_health:
# 两个都健康,优先LA1
manager.switch_to_tunnel("la1")
else:
print("所有隧道都不健康")
sys.exit(1)
else:
print("无效命令")
sys.exit(1)
if __name__ == "__main__":
main()
脚本 4.1.2:切换脚本的systemd服务
# /etc/systemd/system/tunnel-manager.service
[Unit]
Description=Tunnel Manager Service
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/usr/local/bin/tunnel_manager_daemon.py
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
#!/usr/bin/env python3
# /usr/local/bin/tunnel_manager_daemon.py
import time
from tunnel_switch import TunnelManager
def daemon_main():
manager = TunnelManager()
check_interval = 60 # 每60秒检查一次
while True:
try:
# 检查当前隧道健康状态
current = manager.detect_current_tunnel()
current_health = manager.check_tunnel_health(current)
if not current_health:
manager.log(f"当前隧道 {current} 不健康,尝试切换到备用隧道")
# 尝试切换到另一个隧道
if current == "la1":
manager.switch_to_tunnel("la2")
else:
manager.switch_to_tunnel("la1")
time.sleep(check_interval)
except Exception as e:
manager.log(f"守护进程异常: {str(e)}")
time.sleep(10)
if __name__ == "__main__":
daemon_main()
4.2 基础监控配置
脚本 4.2.1:Prometheus Node Exporter配置
#!/bin/bash
# setup_monitoring.sh
# 在HZ1和LA1、LA2上分别执行
# 1. 安装Node Exporter
wget https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node_exporter-1.6.1.linux-amd64.tar.gz
tar xvf node_exporter-*.tar.gz
mv node_exporter-*/node_exporter /usr/local/bin/
# 2. 创建systemd服务
cat > /etc/systemd/system/node_exporter.service << EOF
[Unit]
Description=Node Exporter
After=network.target
[Service]
Type=simple
User=nobody
ExecStart=/usr/local/bin/node_exporter \
--collector.systemd \
--collector.tcpstat \
--collector.netstat \
--collector.netdev \
--collector.conntrack \
--web.listen-address=:9100
[Install]
WantedBy=multi-user.target
EOF
# 3. 启动服务
systemctl daemon-reload
systemctl enable node_exporter
systemctl start node_exporter
# 4. 安装自定义指标收集脚本
cat > /usr/local/bin/custom_metrics.sh << 'EOF'
#!/bin/bash
# 收集自定义指标
# WireGuard隧道状态
wg_la1_status=$(wg show wg0 2>/dev/null | grep -c "interface: wg0" || echo "0")
wg_la2_status=$(wg show wg1 2>/dev/null | grep -c "interface: wg1" || echo "0")
# 隧道延迟
la1_latency=$(ping -c 1 -W 1 10.100.1.2 2>/dev/null | grep "time=" | awk -F'time=' '{print $2}' | awk '{print $1}' || echo "0")
la2_latency=$(ping -c 1 -W 1 10.100.2.2 2>/dev/null | grep "time=" | awk -F'time=' '{print $2}' | awk '{print $1}' || echo "0")
# 输出Prometheus格式指标
cat << METRICS
# HELP wg_tunnel_status WireGuard隧道状态 (1=up, 0=down)
# TYPE wg_tunnel_status gauge
wg_tunnel_status{tunnel="la1"} ${wg_la1_status}
wg_tunnel_status{tunnel="la2"} ${wg_la2_status}
# HELP tunnel_latency_ms 隧道延迟(毫秒)
# TYPE tunnel_latency_ms gauge
tunnel_latency_ms{tunnel="la1"} ${la1_latency}
tunnel_latency_ms{tunnel="la2"} ${la2_latency}
# HELP tunnel_last_check 最后检查时间戳
# TYPE tunnel_last_check gauge
tunnel_last_check $(date +%s)
METRICS
EOF
chmod +x /usr/local/bin/custom_metrics.sh
# 5. 配置Prometheus textfile收集器
cat > /etc/systemd/system/custom-metrics.service << EOF
[Unit]
Description=Custom Metrics Collector
After=network-online.target
[Service]
Type=oneshot
ExecStart=/usr/local/bin/custom_metrics.sh > /var/lib/node_exporter/custom_metrics.prom
EOF
cat > /etc/systemd/system/custom-metrics.timer << EOF
[Unit]
Description=Run custom metrics every 10 seconds
[Timer]
OnBootSec=1min
OnUnitActiveSec=10s
[Install]
WantedBy=timers.target
EOF
systemctl daemon-reload
systemctl enable custom-metrics.timer
systemctl start custom-metrics.timer
第5天:测试与验证
5.1 测试脚本
脚本 5.1.1:双隧道功能测试脚本
#!/usr/bin/env python3
# test_tunnels.py
import subprocess
import time
import json
import sys
class TunnelTester:
def __init__(self):
self.test_cases = []
def run_test(self, name, command, expected_returncode=0):
"""运行单个测试"""
print(f"\n{'='*50}")
print(f"测试: {name}")
print(f"命令: {command}")
start_time = time.time()
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=30
)
elapsed = time.time() - start_time
success = (result.returncode == expected_returncode)
status = "✅ 通过" if success else "❌ 失败"
print(f"结果: {status} ({elapsed:.2f}秒)")
print(f"返回值: {result.returncode}")
if result.stdout:
print(f"输出: {result.stdout[:500]}...")
if result.stderr and not success:
print(f"错误: {result.stderr[:500]}...")
return {
"name": name,
"success": success,
"elapsed": elapsed,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
elapsed = time.time() - start_time
print(f"结果: ⏱️ 超时 ({elapsed:.2f}秒)")
return {
"name": name,
"success": False,
"elapsed": elapsed,
"returncode": -1
}
except Exception as e:
print(f"结果: ❌ 异常: {str(e)}")
return {
"name": name,
"success": False,
"elapsed": 0,
"returncode": -1
}
def run_all_tests(self):
"""运行所有测试"""
print("开始跨境双隧道测试套件")
print("="*50)
tests = [
# 1. 基础连接测试
("检查WireGuard接口", "ip link show wg0 && ip link show wg1"),
("检查WireGuard状态", "wg show"),
("检查路由表", "ip route show table la1_table && ip route show table la2_table"),
# 2. 隧道连通性测试
("测试LA1隧道连通性", "ping -c 3 -W 1 -I wg0 10.100.1.2"),
("测试LA2隧道连通性", "ping -c 3 -W 1 -I wg1 10.100.2.2"),
# 3. 代理层测试
("测试代理服务状态", "systemctl status xray --no-pager"),
("测试代理端口监听", "netstat -tlnp | grep '1000[12]'"),
# 4. 策略路由测试
("测试策略路由规则", "ip rule show"),
("测试默认路由", "ip route show default"),
# 5. 跨境访问测试
("测试通过LA1访问美国服务", "curl --interface wg0 -s --connect-timeout 5 http://ifconfig.me"),
("测试通过LA2访问美国服务", "curl --interface wg1 -s --connect-timeout 5 http://ifconfig.me"),
# 6. 切换功能测试
("测试切换到LA2", "/usr/local/bin/tunnel_switch.py switch la2"),
("验证LA2为默认路由", "ip route show default | grep wg1"),
("测试切换后访问", "curl --interface wg1 -s --connect-timeout 5 http://ifconfig.me"),
("切换回LA1", "/usr/local/bin/tunnel_switch.py switch la1"),
# 7. 监控测试
("测试监控指标", "/usr/local/bin/custom_metrics.sh | head -5"),
("测试Node Exporter", "curl -s http://localhost:9100/metrics | grep -i 'node_' | head -3"),
]
results = []
for test_name, test_command in tests:
result = self.run_test(test_name, test_command)
results.append(result)
time.sleep(1) # 测试间隔
# 生成测试报告
self.generate_report(results)
# 返回总体结果
total = len(results)
passed = sum(1 for r in results if r["success"])
failed = total - passed
print(f"\n{'='*50}")
print(f"测试完成: {passed}/{total} 通过 ({passed/total*100:.1f}%)")
if failed > 0:
print("\n失败的测试:")
for r in results:
if not r["success"]:
print(f" - {r['name']}")
return passed == total
def generate_report(self, results):
"""生成测试报告"""
report = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"hostname": subprocess.getoutput("hostname"),
"results": results,
"summary": {
"total": len(results),
"passed": sum(1 for r in results if r["success"]),
"failed": sum(1 for r in results if not r["success"])
}
}
# 保存报告到文件
report_file = f"/tmp/tunnel_test_report_{time.strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
print(f"\n测试报告已保存: {report_file}")
# 同时输出简单文本报告
text_report = f"/tmp/tunnel_test_report_{time.strftime('%Y%m%d_%H%M%S')}.txt"
with open(text_report, "w") as f:
f.write("跨境双隧道测试报告\n")
f.write("="*50 + "\n")
f.write(f"时间: {report['timestamp']}\n")
f.write(f"主机: {report['hostname']}\n")
f.write(f"总计: {report['summary']['total']} 项测试\n")
f.write(f"通过: {report['summary']['passed']}\n")
f.write(f"失败: {report['summary']['failed']}\n\n")
for r in results:
status = "✓" if r["success"] else "✗"
f.write(f"{status} {r['name']} ({r['elapsed']:.2f}s)\n")
print(f"文本报告: {text_report}")
def main():
tester = TunnelTester()
success = tester.run_all_tests()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
5.2 回滚脚本
脚本 5.2.1:一键回滚到单隧道模式
#!/bin/bash
# rollback_to_single_tunnel.sh
# 紧急情况下回滚到只有LA1的单隧道模式
set -e
echo "=== 开始回滚到单隧道模式 ==="
# 1. 停止并禁用LA2相关服务
systemctl stop wg-quick@wg1 2>/dev/null || true
systemctl disable wg-quick@wg1 2>/dev/null || true
systemctl stop tunnel-manager 2>/dev/null || true
systemctl disable tunnel-manager 2>/dev/null || true
# 2. 停止LA2的代理服务
systemctl stop xray-tunnel2 2>/dev/null || true
systemctl disable xray-tunnel2 2>/dev/null || true
# 3. 清理路由表和规则
# 删除自定义路由表
sed -i '/la2_table/d' /etc/iproute2/rt_tables
sed -i '/default_table/d' /etc/iproute2/rt_tables
# 删除LA2相关路由规则
for pref in $(ip rule show | grep -E "200[0-9]:.*la2" | awk -F: '{print $1}'); do
ip rule del pref $pref 2>/dev/null || true
done
# 删除LA2接口的路由
ip route del 10.8.2.0/24 dev wg1 2>/dev/null || true
ip route del 10.100.2.0/30 dev wg1 2>/dev/null || true
# 4. 确保默认路由指向LA1
ip route replace default via 10.100.1.2 dev wg0
# 5. 清理防火墙标记规则
iptables -t mangle -F
iptables-save > /etc/iptables/rules.v4
# 6. 刷新路由缓存
ip route flush cache
# 7. 验证回滚结果
echo -e "\n=== 回滚验证 ==="
echo "当前默认路由:"
ip route show default
echo -e "\nWireGuard接口状态:"
wg show
echo -e "\n接口状态:"
ip link show wg0
ip link show wg1 2>/dev/null || echo "wg1接口已不存在"
# 8. 测试连通性
echo -e "\n测试LA1隧道连通性:"
ping -c 2 -W 1 -I wg0 10.100.1.2
echo -e "\n测试跨境访问:"
curl --interface wg0 -s --connect-timeout 3 http://ifconfig.me/ip
echo -e "\n=== 回滚完成 ==="
echo "系统已回滚到单隧道(LA1)模式"
echo "如果需要重新启用双隧道,请运行: systemctl enable wg-quick@wg1 && systemctl start wg-quick@wg1"
第二阶段:智能路由与自动切换实施方案
目标周期:15个工作日
完成标准:FRR部署、健康检查系统、自动切换验证
第6-7天:FRR部署与OSPF配置
6.1 FRR安装与配置
脚本 6.1.1:FRR安装配置脚本
#!/bin/bash
# frr_installation.sh
# 在HZ1节点上执行
set -e
echo "=== FRR安装配置开始 ==="
# 1. 安装FRR
apt update
apt install -y frr frr-pythontools
# 2. 配置FRR守护进程
sed -i 's/^bgpd=no/bgpd=yes/' /etc/frr/daemons
sed -i 's/^ospfd=no/ospfd=yes/' /etc/frr/daemons
sed -i 's/^zebra=no/zebra=yes/' /etc/frr/daemons
sed -i 's/^staticd=no/staticd=yes/' /etc/frr/daemons
# 3. 配置zebra(基础路由守护进程)
cat > /etc/frr/zebra.conf << 'EOF'
! Zebra configuration
hostname HZ1-router
password frr
enable password frr
! 日志配置
log file /var/log/frr/zebra.log
log syslog informational
EOF
# 4. 配置OSPF
cat > /etc/frr/ospfd.conf << 'EOF'
! OSPF configuration
hostname HZ1-ospfd
password frr
enable password frr
! 日志配置
log file /var/log/frr/ospfd.log
log syslog informational
! OSPF进程配置
router ospf
! 路由器ID
ospf router-id 10.10.1.1
! 重分发直连和静态路由
redistribute connected
redistribute static
! 网络宣告
network 10.10.0.0/16 area 0
network 10.100.1.0/30 area 0
network 10.100.2.0/30 area 0
! 被动接口配置(除了隧道接口)
passive-interface default
no passive-interface wg0
no passive-interface wg1
! OSPF调优参数
auto-cost reference-bandwidth 1000
timers throttle spf 10 100 5000
timers throttle lsa all 10 100 5000
EOF
# 5. 配置静态路由重分发
cat > /etc/frr/staticd.conf << 'EOF'
! Static routing configuration
hostname HZ1-staticd
password frr
enable password frr
! 日志配置
log file /var/log/frr/staticd.log
log syslog informational
EOF
# 6. 配置VTY访问控制
cat > /etc/frr/vtysh.conf << 'EOF'
! VTY configuration
hostname HZ1
username frr nopassword
!
service integrated-vtysh-config
!
line vty
exec-timeout 30 0
!
end
EOF
# 7. 设置文件权限
chown -R frr:frr /etc/frr
chmod 640 /etc/frr/*.conf
# 8. 启动FRR服务
systemctl enable frr
systemctl restart frr
# 9. 验证FRR状态
echo -e "\n=== 验证FRR状态 ==="
systemctl status frr --no-pager
echo -e "\n=== 进入FRR控制台验证 ==="
echo "使用以下命令进入FRR控制台:"
echo " vtysh"
echo ""
echo "在vtysh中可用命令:"
echo " show running-config"
echo " show ip ospf interface"
echo " show ip ospf neighbor"
echo " show ip route ospf"
# 10. 创建FRR操作辅助脚本
cat > /usr/local/bin/frr-status.sh << 'EOF'
#!/bin/bash
# FRR状态检查脚本
echo "=== FRR服务状态 ==="
systemctl status frr --no-pager | grep -A 3 "Active:"
echo -e "\n=== OSPF接口状态 ==="
vtysh -c "show ip ospf interface brief"
echo -e "\n=== OSPF邻居 ==="
vtysh -c "show ip ospf neighbor"
echo -e "\n=== OSPF路由表 ==="
vtysh -c "show ip route ospf" | head -20
echo -e "\n=== 全部路由表 ==="
vtysh -c "show ip route" | grep -E "(O|C|S)" | head -30
EOF
chmod +x /usr/local/bin/frr-status.sh
echo "=== FRR安装配置完成 ==="
6.2 OSPF接口调优
脚本 6.2.1:OSPF接口优化配置
#!/bin/bash
# ospf_interface_tuning.sh
# 在HZ1上执行
# 进入vtysh配置模式
vtysh << 'EOF'
configure terminal
! 配置wg0接口(连接LA1)
interface wg0
! 初始cost设置为10(高质量链路)
ip ospf cost 10
! OSPF网络类型设置为点对点(适合隧道)
ip ospf network point-to-point
! OSPF定时器调优
ip ospf dead-interval minimal hello-multiplier 4
! 启用MTU忽略(防止MTU不匹配问题)
ip ospf mtu-ignore
! 描述
description "Tunnel to LA1 (Primary)"
exit
! 配置wg1接口(连接LA2)
interface wg1
! 初始cost设置为50(备用链路)
ip ospf cost 50
! OSPF网络类型设置为点对点
ip ospf network point-to-point
! OSPF定时器调优
ip ospf dead-interval minimal hello-multiplier 4
! 启用MTU忽略
ip ospf mtu-ignore
! 描述
description "Tunnel to LA2 (Backup)"
exit
! 配置杭州内网接口
interface eth1
! 如果是广播网络,使用默认设置
ip ospf cost 1
! 描述
description "Hangzhou Internal Network"
exit
! 保存配置
write memory
EOF
echo "OSPF接口调优完成"
第8-10天:健康检查系统开发
8.1 健康检查核心模块
脚本 8.1.1:综合健康检查系统
#!/usr/bin/env python3
# /usr/local/bin/health_checker.py
import subprocess
import time
import json
import statistics
import threading
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/health_checker.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class TunnelConfig:
"""隧道配置"""
name: str
interface: str
gateway_ip: str
test_targets: List[str] # 测试目标列表
initial_cost: int = 100
min_cost: int = 10
max_cost: int = 65535
@dataclass
class HealthCheckResult:
"""健康检查结果"""
timestamp: datetime
tunnel_name: str
latency_ms: float
jitter_ms: float
packet_loss_percent: float
tcp_connect_time_ms: float
http_response_time_ms: float
overall_score: float
recommended_cost: int
class TunnelHealthChecker:
def __init__(self):
self.tunnels = {
"la1": TunnelConfig(
name="la1",
interface="wg0",
gateway_ip="10.100.1.2",
test_targets=["8.8.8.8", "1.1.1.1", "10.100.1.2"],
initial_cost=10
),
"la2": TunnelConfig(
name="la2",
interface="wg1",
gateway_ip="10.100.2.2",
test_targets=["8.8.4.4", "9.9.9.9", "10.100.2.2"],
initial_cost=50
)
}
# 健康检查历史记录
self.history: Dict[str, List[HealthCheckResult]] = {
"la1": [],
"la2": []
}
# 当前cost值
self.current_costs = {
"la1": 10,
"la2": 50
}
# 锁,用于线程安全
self.lock = threading.Lock()
def ping_test(self, target_ip: str, interface: str, count: int = 10) -> Dict:
"""执行ping测试"""
try:
cmd = f"ping -c {count} -i 0.2 -W 1 -I {interface} {target_ip}"
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=count * 2
)
if result.returncode == 0:
# 解析输出
lines = result.stdout.split('\n')
stats_line = None
rtt_line = None
for line in lines:
if "packet loss" in line:
stats_line = line
if "rtt min/avg/max/mdev" in line:
rtt_line = line
if stats_line and rtt_line:
# 提取丢包率
loss_str = stats_line.split('%')[0].split()[-1]
packet_loss = float(loss_str)
# 提取RTT统计
rtt_parts = rtt_line.split('=')[1].strip().split('/')
rtt_min = float(rtt_parts[0])
rtt_avg = float(rtt_parts[1])
rtt_max = float(rtt_parts[2])
rtt_mdev = float(rtt_parts[3]) # 抖动
return {
"success": True,
"packet_loss": packet_loss,
"latency_min": rtt_min,
"latency_avg": rtt_avg,
"latency_max": rtt_max,
"jitter": rtt_mdev,
"packets_sent": count,
"packets_received": count * (1 - packet_loss / 100)
}
return {"success": False, "error": "解析失败"}
except subprocess.TimeoutExpired:
return {"success": False, "error": "超时"}
except Exception as e:
return {"success": False, "error": str(e)}
def tcp_connect_test(self, target_ip: str, port: int, interface: str, timeout: int = 3) -> float:
"""TCP连接测试"""
try:
# 使用timeout命令和nc进行TCP连接测试
start_time = time.time()
cmd = f"timeout {timeout} nc -z -v -w {timeout} {target_ip} {port} 2>&1"
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True
)
elapsed = (time.time() - start_time) * 1000 # 转换为毫秒
if result.returncode == 0:
return elapsed
else:
return timeout * 1000 # 返回超时值
except:
return timeout * 1000
def http_test(self, url: str, interface: str, timeout: int = 5) -> float:
"""HTTP响应测试"""
try:
start_time = time.time()
cmd = f"curl --interface {interface} -s -o /dev/null -w '%{{http_code}} %{{time_total}}' --max-time {timeout} {url}"
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True
)
elapsed = (time.time() - start_time) * 1000
if result.returncode == 0:
parts = result.stdout.strip().split()
if len(parts) == 2 and parts[0] == "200":
return float(parts[1]) * 1000 # 转换为毫秒
return timeout * 1000
except:
return timeout * 1000
def calculate_score(self, test_results: Dict) -> float:
"""计算综合评分(0-100)"""
weights = {
"latency": 0.3, # 延迟权重
"jitter": 0.2, # 抖动权重
"packet_loss": 0.3, # 丢包权重
"tcp_connect": 0.1, # TCP连接权重
"http_response": 0.1 # HTTP响应权重
}
# 归一化各项指标(0-1,1为最佳)
def normalize_latency(latency_ms):
# 延迟低于50ms得1分,超过300ms得0分
return max(0, min(1, 1 - (latency_ms - 50) / 250))
def normalize_jitter(jitter_ms):
# 抖动低于10ms得1分,超过100ms得0分
return max(0, min(1, 1 - jitter_ms / 100))
def normalize_packet_loss(loss_percent):
# 丢包0%得1分,超过20%得0分
return max(0, min(1, 1 - loss_percent / 20))
def normalize_tcp_connect(connect_time_ms):
# 连接时间低于100ms得1分,超过1000ms得0分
return max(0, min(1, 1 - (connect_time_ms - 100) / 900))
def normalize_http_response(response_time_ms):
# 响应时间低于200ms得1分,超过2000ms得0分
return max(0, min(1, 1 - (response_time_ms - 200) / 1800))
# 计算各项得分
latency_score = normalize_latency(test_results.get("latency_avg", 300))
jitter_score = normalize_jitter(test_results.get("jitter", 100))
packet_loss_score = normalize_packet_loss(test_results.get("packet_loss", 100))
tcp_score = normalize_tcp_connect(test_results.get("tcp_connect_time", 1000))
http_score = normalize_http_response(test_results.get("http_response_time", 2000))
# 加权平均
total_score = (
latency_score * weights["latency"] +
jitter_score * weights["jitter"] +
packet_loss_score * weights["packet_loss"] +
tcp_score * weights["tcp_connect"] +
http_score * weights["http_response"]
) * 100 # 转换为0-100分
return round(total_score, 2)
def score_to_cost(self, score: float) -> int:
"""将评分转换为OSPF cost值"""
if score >= 90:
return 10 # 优秀
elif score >= 80:
return 20 # 良好
elif score >= 70:
return 50 # 一般
elif score >= 60:
return 100 # 较差
elif score >= 50:
return 200 # 差
elif score >= 30:
return 500 # 很差
else:
return 65535 # 不可用
def check_tunnel_health(self, tunnel_config: TunnelConfig) -> HealthCheckResult:
"""检查单个隧道的健康状况"""
logger.info(f"开始检查隧道: {tunnel_config.name}")
test_results = {
"latency_avg": 300, # 默认值
"jitter": 100,
"packet_loss": 100,
"tcp_connect_time": 3000,
"http_response_time": 5000
}
# 1. 执行ping测试(对网关和外部目标)
ping_results = []
for target in tunnel_config.test_targets[:2]: # 只测试前两个目标
result = self.ping_test(target, tunnel_config.interface, count=5)
if result["success"]:
ping_results.append(result)
time.sleep(0.5) # 测试间隔
if ping_results:
# 计算平均延迟和抖动
avg_latency = statistics.mean([r["latency_avg"] for r in ping_results])
avg_jitter = statistics.mean([r["jitter"] for r in ping_results])
avg_loss = statistics.mean([r["packet_loss"] for r in ping_results])
test_results.update({
"latency_avg": avg_latency,
"jitter": avg_jitter,
"packet_loss": avg_loss
})
# 2. TCP连接测试
tcp_time = self.tcp_connect_test("8.8.8.8", 53, tunnel_config.interface)
test_results["tcp_connect_time"] = tcp_time
# 3. HTTP测试
http_time = self.http_test("http://ifconfig.me", tunnel_config.interface)
test_results["http_response_time"] = http_time
# 4. 计算综合评分
overall_score = self.calculate_score(test_results)
recommended_cost = self.score_to_cost(overall_score)
# 5. 创建结果对象
result = HealthCheckResult(
timestamp=datetime.now(),
tunnel_name=tunnel_config.name,
latency_ms=test_results["latency_avg"],
jitter_ms=test_results["jitter"],
packet_loss_percent=test_results["packet_loss"],
tcp_connect_time_ms=test_results["tcp_connect_time"],
http_response_time_ms=test_results["http_response_time"],
overall_score=overall_score,
recommended_cost=recommended_cost
)
logger.info(f"隧道 {tunnel_config.name} 检查完成: "
f"评分={overall_score}, 推荐cost={recommended_cost}")
return result
def update_ospf_cost(self, tunnel_name: str, new_cost: int) -> bool:
"""更新OSPF接口的cost值"""
if tunnel_name not in self.tunnels:
logger.error(f"未知隧道: {tunnel_name}")
return False
# 获取当前cost
current_cost = self.current_costs.get(tunnel_name)
# 如果cost没有变化,不需要更新
if current_cost == new_cost:
logger.debug(f"隧道 {tunnel_name} cost未变化: {new_cost}")
return True
# 防止频繁切换:只有当变化超过阈值时才更新
cost_change_threshold = 50
if current_cost and abs(current_cost - new_cost) < cost_change_threshold:
logger.debug(f"隧道 {tunnel_name} cost变化过小: {current_cost} -> {new_cost}")
return False
# 防止cost在边界值附近震荡
if new_cost == 65535 and current_cost and current_cost > 500:
logger.debug(f"隧道 {tunnel_name} 已在故障状态,不更新cost")
return False
try:
interface = self.tunnels[tunnel_name].interface
cmd = f"vtysh -c 'configure terminal' -c 'interface {interface}' -c 'ip ospf cost {new_cost}' -c 'end'"
logger.info(f"更新隧道 {tunnel_name} cost: {current_cost} -> {new_cost}")
subprocess.run(cmd, shell=True, check=True)
# 更新内存中的cost值
self.current_costs[tunnel_name] = new_cost
# 记录变更
change_log = {
"timestamp": datetime.now().isoformat(),
"tunnel": tunnel_name,
"old_cost": current_cost,
"new_cost": new_cost,
"interface": interface
}
with open("/var/log/ospf_cost_changes.log", "a") as f:
f.write(json.dumps(change_log) + "\n")
return True
except subprocess.CalledProcessError as e:
logger.error(f"更新OSPF cost失败: {str(e)}")
return False
except Exception as e:
logger.error(f"更新OSPF cost异常: {str(e)}")
return False
def run_single_check(self):
"""执行一次完整的健康检查"""
with self.lock:
logger.info("开始完整健康检查周期")
# 检查每个隧道
for tunnel_name, config in self.tunnels.items():
try:
# 执行健康检查
result = self.check_tunnel_health(config)
# 保存到历史记录
self.history[tunnel_name].append(result)
# 只保留最近100条记录
if len(self.history[tunnel_name]) > 100:
self.history[tunnel_name] = self.history[tunnel_name][-100:]
# 更新OSPF cost
self.update_ospf_cost(tunnel_name, result.recommended_cost)
# 记录结果
self.save_check_result(result)
except Exception as e:
logger.error(f"检查隧道 {tunnel_name} 时出错: {str(e)}")
logger.info("健康检查周期完成")
def save_check_result(self, result: HealthCheckResult):
"""保存检查结果到文件(用于监控)"""
result_dict = {
"timestamp": result.timestamp.isoformat(),
"tunnel": result.tunnel_name,
"latency_ms": result.latency_ms,
"jitter_ms": result.jitter_ms,
"packet_loss_percent": result.packet_loss_percent,
"tcp_connect_time_ms": result.tcp_connect_time_ms,
"http_response_time_ms": result.http_response_time_ms,
"overall_score": result.overall_score,
"recommended_cost": result.recommended_cost,
"current_cost": self.current_costs.get(result.tunnel_name)
}
# 保存到JSON文件
result_file = f"/var/lib/health_checker/results_{result.tunnel_name}.json"
with open(result_file, "a") as f:
f.write(json.dumps(result_dict) + "\n")
# 生成Prometheus格式的指标
metrics_file = f"/var/lib/node_exporter/tunnel_metrics_{result.tunnel_name}.prom"
metrics = f"""# HELP tunnel_health_score 隧道健康评分 (0-100)
# TYPE tunnel_health_score gauge
tunnel_health_score{{tunnel="{result.tunnel_name}"}} {result.overall_score}
# HELP tunnel_latency_ms 隧道延迟(毫秒)
# TYPE tunnel_latency_ms gauge
tunnel_latency_ms{{tunnel="{result.tunnel_name}"}} {result.latency_ms}
# HELP tunnel_jitter_ms 隧道抖动(毫秒)
# TYPE tunnel_jitter_ms gauge
tunnel_jitter_ms{{tunnel="{result.tunnel_name}"}} {result.jitter_ms}
# HELP tunnel_packet_loss_percent 隧道丢包率(百分比)
# TYPE tunnel_packet_loss_percent gauge
tunnel_packet_loss_percent{{tunnel="{result.tunnel_name}"}} {result.packet_loss_percent}
# HELP tunnel_ospf_cost 隧道OSPF Cost值
# TYPE tunnel_ospf_cost gauge
tunnel_ospf_cost{{tunnel="{result.tunnel_name}"}} {self.current_costs.get(result.tunnel_name, 0)}
"""
with open(metrics_file, "w") as f:
f.write(metrics)
def run_continuously(self, interval_seconds: int = 30):
"""持续运行健康检查"""
logger.info(f"启动健康检查守护进程,间隔: {interval_seconds}秒")
while True:
try:
self.run_single_check()
time.sleep(interval_seconds)
except KeyboardInterrupt:
logger.info("收到中断信号,停止健康检查")
break
except Exception as e:
logger.error(f"健康检查循环出错: {str(e)}")
time.sleep(10) # 出错后等待10秒再重试
def main():
import argparse
parser = argparse.ArgumentParser(description="隧道健康检查系统")
parser.add_argument("--interval", type=int, default=30,
help="检查间隔(秒)")
parser.add_argument("--once", action="store_true",
help="只执行一次检查")
parser.add_argument("--debug", action="store_true",
help="启用调试模式")
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
# 创建必要的目录
subprocess.run("mkdir -p /var/lib/health_checker /var/lib/node_exporter",
shell=True)
checker = TunnelHealthChecker()
if args.once:
checker.run_single_check()
else:
checker.run_continuously(args.interval)
if __name__ == "__main__":
main()
8.2 健康检查服务化
脚本 8.2.1:健康检查systemd服务
# /etc/systemd/system/health-checker.service
[Unit]
Description=Tunnel Health Checker
After=network-online.target frr.service
Wants=network-online.target
Requires=frr.service
[Service]
Type=simple
User=root
ExecStart=/usr/local/bin/health_checker.py --interval 30
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
# 安全配置
PrivateTmp=true
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/var/lib/health_checker /var/lib/node_exporter /var/log
[Install]
WantedBy=multi-user.target
脚本 8.2.2:健康检查API服务
#!/usr/bin/env python3
# /usr/local/bin/health_check_api.py
from flask import Flask, jsonify, request
import json
from datetime import datetime
import threading
from health_checker import TunnelHealthChecker
app = Flask(__name__)
checker = TunnelHealthChecker()
lock = threading.Lock()
@app.route('/api/v1/health/status', methods=['GET'])
def get_health_status():
"""获取所有隧道健康状态"""
with lock:
status = {}
for tunnel_name in checker.tunnels:
if checker.history.get(tunnel_name):
latest = checker.history[tunnel_name][-1]
status[tunnel_name] = {
"score": latest.overall_score,
"latency_ms": latest.latency_ms,
"packet_loss_percent": latest.packet_loss_percent,
"current_cost": checker.current_costs.get(tunnel_name),
"last_check": latest.timestamp.isoformat()
}
return jsonify({
"timestamp": datetime.now().isoformat(),
"status": status
})
@app.route('/api/v1/health/check', methods=['POST'])
def trigger_health_check():
"""触发立即健康检查"""
with lock:
checker.run_single_check()
# 获取最新结果
result = {}
for tunnel_name in checker.tunnels:
if checker.history.get(tunnel_name):
latest = checker.history[tunnel_name][-1]
result[tunnel_name] = {
"score": latest.overall_score,
"recommended_cost": latest.recommended_cost,
"current_cost": checker.current_costs.get(tunnel_name)
}
return jsonify({
"message": "Health check completed",
"result": result,
"timestamp": datetime.now().isoformat()
})
@app.route('/api/v1/health/history/<tunnel_name>', methods=['GET'])
def get_health_history(tunnel_name):
"""获取隧道历史数据"""
limit = request.args.get('limit', default=100, type=int)
if tunnel_name not in checker.history:
return jsonify({"error": "Tunnel not found"}), 404
history = checker.history[tunnel_name][-limit:]
return jsonify({
"tunnel": tunnel_name,
"history": [
{
"timestamp": h.timestamp.isoformat(),
"score": h.overall_score,
"latency_ms": h.latency_ms,
"packet_loss_percent": h.packet_loss_percent,
"recommended_cost": h.recommended_cost
}
for h in history
]
})
@app.route('/api/v1/health/metrics', methods=['GET'])
def get_prometheus_metrics():
"""提供Prometheus格式的指标"""
metrics = []
for tunnel_name in checker.tunnels:
if checker.history.get(tunnel_name):
latest = checker.history[tunnel_name][-1]
current_cost = checker.current_costs.get(tunnel_name, 0)
metrics.extend([
f'tunnel_health_score{{tunnel="{tunnel_name}"}} {latest.overall_score}',
f'tunnel_latency_ms{{tunnel="{tunnel_name}"}} {latest.latency_ms}',
f'tunnel_packet_loss_percent{{tunnel="{tunnel_name}"}} {latest.packet_loss_percent}',
f'tunnel_ospf_cost{{tunnel="{tunnel_name}"}} {current_cost}',
f'tunnel_last_check{{tunnel="{tunnel_name}"}} {int(latest.timestamp.timestamp())}'
])
return '\n'.join(metrics), 200, {'Content-Type': 'text/plain'}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=False)
第11-12天:监控告警系统增强
11.1 Prometheus配置
配置文件 11.1.1:Prometheus监控配置
# /etc/prometheus/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093
rule_files:
- "/etc/prometheus/alerts.yml"
scrape_configs:
# Node Exporter (HZ1, LA1, LA2)
- job_name: 'node_exporter'
static_configs:
- targets:
- 'hz1:9100'
- 'la1:9100'
- 'la2:9100'
metrics_path: /metrics
scrape_interval: 30s
# 隧道健康检查API
- job_name: 'tunnel_health'
static_configs:
- targets: ['hz1:8080']
metrics_path: /api/v1/health/metrics
scrape_interval: 30s
# FRR监控
- job_name: 'frr_exporter'
static_configs:
- targets: ['hz1:9344'] # frr_exporter端口
scrape_interval: 30s
# 黑盒监控(外部探测)
- job_name: 'blackbox_tunnel'
metrics_path: /probe
params:
module: [tcp_connect]
static_configs:
- targets:
- '10.100.1.2:51820' # LA1 WireGuard端口
- '10.100.2.2:51820' # LA2 WireGuard端口
- '8.8.8.8:53' # 通过隧道访问
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: localhost:9115 # Blackbox Exporter地址
11.2 告警规则配置
配置文件 11.2.1:告警规则
# /etc/prometheus/alerts.yml
groups:
- name: tunnel_alerts
rules:
# 隧道完全中断告警
- alert: TunnelDown
expr: tunnel_health_score == 0 or tunnel_health_score < 30
for: 1m
labels:
severity: critical
component: tunnel
annotations:
summary: "隧道 {{ $labels.tunnel }} 完全中断"
description: "隧道 {{ $labels.tunnel }} 健康评分为 {{ $value }},低于30分,可能完全中断"
# 隧道质量劣化告警
- alert: TunnelDegraded
expr: tunnel_health_score < 60
for: 2m
labels:
severity: warning
component: tunnel
annotations:
summary: "隧道 {{ $labels.tunnel }} 质量劣化"
description: "隧道 {{ $labels.tunnel }} 健康评分为 {{ $value }},低于60分"
# 高延迟告警
- alert: HighLatency
expr: tunnel_latency_ms > 200
for: 3m
labels:
severity: warning
component: tunnel
annotations:
summary: "隧道 {{ $labels.tunnel }} 延迟过高"
description: "隧道 {{ $labels.tunnel }} 延迟为 {{ $value }}ms,超过200ms阈值"
# 高丢包告警
- alert: HighPacketLoss
expr: tunnel_packet_loss_percent > 10
for: 2m
labels:
severity: warning
component: tunnel
annotations:
summary: "隧道 {{ $labels.tunnel }} 丢包过高"
description: "隧道 {{ $labels.tunnel }} 丢包率为 {{ $value }}%,超过10%阈值"
# OSPF路由切换告警
- alert: OSPFCostChanged
expr: |
abs(
delta(tunnel_ospf_cost[5m])
) > 50
for: 0m
labels:
severity: info
component: routing
annotations:
summary: "隧道 {{ $labels.tunnel }} OSPF Cost发生变化"
description: "隧道 {{ $labels.tunnel }} OSPF Cost在5分钟内变化 {{ $value }}"
# 所有隧道同时故障(灾难性)
- alert: AllTunnelsDown
expr: |
sum(tunnel_health_score < 30) == 2
for: 30s
labels:
severity: critical
component: infrastructure
annotations:
summary: "所有跨境隧道中断"
description: "LA1和LA2隧道同时故障,跨境网络完全中断"
# 节点资源告警
- alert: HighCPUUsage
expr: |
100 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80
for: 5m
labels:
severity: warning
component: system
annotations:
summary: "{{ $labels.instance }} CPU使用率过高"
description: "{{ $labels.instance }} CPU使用率为 {{ $value }}%"
# 内存使用告警
- alert: HighMemoryUsage
expr: |
(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100 > 85
for: 5m
labels:
severity: warning
component: system
annotations:
summary: "{{ $labels.instance }} 内存使用率过高"
description: "{{ $labels.instance }} 内存使用率为 {{ $value }}%"
11.3 Grafana仪表板配置
脚本 11.3.1:Grafana仪表板导入脚本
#!/bin/bash
# setup_grafana_dashboard.sh
# 创建隧道监控仪表板JSON文件
cat > /etc/grafana/dashboards/tunnel_monitoring.json << 'EOF'
{
"dashboard": {
"title": "跨境隧道监控",
"tags": ["tunnel", "network", "monitoring"],
"timezone": "browser",
"panels": [
{
"title": "隧道健康评分",
"type": "graph",
"targets": [
{
"expr": "tunnel_health_score",
"legendFormat": "{{tunnel}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
},
{
"title": "隧道延迟(ms)",
"type": "graph",
"targets": [
{
"expr": "tunnel_latency_ms",
"legendFormat": "{{tunnel}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
},
{
"title": "隧道丢包率(%)",
"type": "graph",
"targets": [
{
"expr": "tunnel_packet_loss_percent",
"legendFormat": "{{tunnel}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 8}
},
{
"title": "OSPF Cost值",
"type": "graph",
"targets": [
{
"expr": "tunnel_ospf_cost",
"legendFormat": "{{tunnel}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 8}
},
{
"title": "隧道流量(入)",
"type": "graph",
"targets": [
{
"expr": "rate(node_network_receive_bytes_total{device=~\"wg.*\"}[5m]) * 8",
"legendFormat": "{{device}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 16}
},
{
"title": "隧道流量(出)",
"type": "graph",
"targets": [
{
"expr": "rate(node_network_transmit_bytes_total{device=~\"wg.*\"}[5m]) * 8",
"legendFormat": "{{device}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 16}
},
{
"title": "当前状态面板",
"type": "stat",
"targets": [
{
"expr": "tunnel_health_score",
"instant": true,
"legendFormat": "{{tunnel}}"
}
],
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 24}
}
],
"schemaVersion": 27,
"version": 1
},
"overwrite": true
}
EOF
# 通过Grafana API导入仪表板
GRAFANA_URL="http://localhost:3000"
API_KEY="your_grafana_api_key"
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $API_KEY" \
-d @/etc/grafana/dashboards/tunnel_monitoring.json \
"$GRAFANA_URL/api/dashboards/db"
第13-14天:故障切换模拟测试
13.1 故障注入测试工具
脚本 13.1.1:故障注入测试框架
#!/usr/bin/env python3
# fault_injection_test.py
import subprocess
import time
import json
import random
from datetime import datetime
import threading
from enum import Enum
class FaultType(Enum):
"""故障类型枚举"""
TUNNEL_DOWN = "tunnel_down" # 隧道完全中断
HIGH_LATENCY = "high_latency" # 高延迟
PACKET_LOSS = "packet_loss" # 高丢包
BANDWIDTH_LIMIT = "bandwidth_limit" # 带宽限制
INTERFACE_DOWN = "interface_down" # 接口关闭
ROUTE_FLAP = "route_flap" # 路由震荡
class FaultInjector:
def __init__(self):
self.active_faults = {}
self.test_scenarios = self.load_scenarios()
def load_scenarios(self):
"""加载测试场景"""
return [
{
"name": "主隧道故障切换",
"description": "模拟LA1完全故障,验证自动切换到LA2",
"steps": [
{"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 300},
{"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 0} # 恢复
]
},
{
"name": "主隧道质量劣化",
"description": "模拟LA1高延迟高丢包,验证部分切换",
"steps": [
{"type": FaultType.HIGH_LATENCY, "target": "la1", "duration": 180},
{"type": FaultType.PACKET_LOSS, "target": "la1", "duration": 180},
{"type": FaultType.HIGH_LATENCY, "target": "la1", "duration": 0},
{"type": FaultType.PACKET_LOSS, "target": "la1", "duration": 0}
]
},
{
"name": "双隧道交替故障",
"description": "模拟双隧道交替故障,验证稳定性",
"steps": [
{"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 60},
{"type": FaultType.TUNNEL_DOWN, "target": "la1", "duration": 0},
{"type": FaultType.TUNNEL_DOWN, "target": "la2", "duration": 60},
{"type": FaultType.TUNNEL_DOWN, "target": "la2", "duration": 0}
]
},
{
"name": "路由震荡测试",
"description": "模拟路由频繁切换,验证防震荡机制",
"steps": [
{"type": FaultType.ROUTE_FLAP, "target": "la1", "duration": 120}
]
}
]
def inject_fault(self, fault_type: FaultType, target: str, duration: int, parameters: dict = None):
"""注入故障"""
fault_id = f"{fault_type.value}_{target}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if fault_type == FaultType.TUNNEL_DOWN:
self.inject_tunnel_down(target, duration)
elif fault_type == FaultType.HIGH_LATENCY:
latency = parameters.get("latency_ms", 300) if parameters else 300
self.inject_high_latency(target, latency, duration)
elif fault_type == FaultType.PACKET_LOSS:
loss = parameters.get("loss_percent", 30) if parameters else 30
self.inject_packet_loss(target, loss, duration)
elif fault_type == FaultType.BANDWIDTH_LIMIT:
bandwidth = parameters.get("bandwidth_mbps", 10) if parameters else 10
self.inject_bandwidth_limit(target, bandwidth, duration)
elif fault_type == FaultType.INTERFACE_DOWN:
self.inject_interface_down(target, duration)
elif fault_type == FaultType.ROUTE_FLAP:
interval = parameters.get("interval_seconds", 10) if parameters else 10
self.inject_route_flap(target, interval, duration)
self.active_faults[fault_id] = {
"type": fault_type.value,
"target": target,
"start_time": datetime.now(),
"duration": duration,
"parameters": parameters or {}
}
print(f"注入故障: {fault_id}")
# 如果是临时故障,启动恢复定时器
if duration > 0:
threading.Timer(duration, self.recover_fault, args=[fault_id]).start()
return fault_id
def inject_tunnel_down(self, target: str, duration: int):
"""注入隧道中断故障"""
if target == "la1":
# 停止WireGuard服务
subprocess.run(f"systemctl stop wg-quick@wg0", shell=True)
elif target == "la2":
subprocess.run(f"systemctl stop wg-quick@wg1", shell=True)
def inject_high_latency(self, target: str, latency_ms: int, duration: int):
"""注入高延迟故障(使用tc)"""
if target == "la1":
iface = "wg0"
gateway = "10.100.1.2"
else:
iface = "wg1"
gateway = "10.100.2.2"
# 使用tc添加延迟
base_latency = 50 # 基础延迟
add_latency = max(0, latency_ms - base_latency)
cmd = f"""
tc qdisc add dev {iface} root netem delay {add_latency}ms 10ms distribution normal
"""
subprocess.run(cmd, shell=True)
def inject_packet_loss(self, target: str, loss_percent: int, duration: int):
"""注入丢包故障"""
if target == "la1":
iface = "wg0"
else:
iface = "wg1"
cmd = f"""
tc qdisc add dev {iface} root netem loss {loss_percent}%
"""
subprocess.run(cmd, shell=True)
def inject_bandwidth_limit(self, target: str, bandwidth_mbps: int, duration: int):
"""注入带宽限制"""
if target == "la1":
iface = "wg0"
else:
iface = "wg1"
rate = bandwidth_mbps * 1000 # 转换为kbit
cmd = f"""
tc qdisc add dev {iface} root handle 1: htb default 10
tc class add dev {iface} parent 1: classid 1:10 htb rate {rate}kbit ceil {rate}kbit
"""
subprocess.run(cmd, shell=True)
def inject_interface_down(self, target: str, duration: int):
"""关闭接口"""
if target == "la1":
iface = "wg0"
else:
iface = "wg1"
subprocess.run(f"ip link set {iface} down", shell=True)
def inject_route_flap(self, target: str, interval_seconds: int, duration: int):
"""注入路由震荡故障"""
def flap_cycle():
if target == "la1":
iface = "wg0"
cost_good = 10
cost_bad = 65535
else:
iface = "wg1"
cost_good = 50
cost_bad = 65535
start_time = time.time()
while time.time() - start_time < duration:
# 切换到坏状态
subprocess.run(f"vtysh -c 'configure terminal' -c 'interface {iface}' -c 'ip ospf cost {cost_bad}' -c 'end'",
shell=True)
time.sleep(interval_seconds / 2)
# 切换回好状态
subprocess.run(f"vtysh -c 'configure terminal' -c 'interface {iface}' -c 'ip ospf cost {cost_good}' -c 'end'",
shell=True)
time.sleep(interval_seconds / 2)
threading.Thread(target=flap_cycle, daemon=True).start()
def recover_fault(self, fault_id: str):
"""恢复故障"""
if fault_id not in self.active_faults:
return
fault = self.active_faults[fault_id]
fault_type = FaultType(fault["type"])
target = fault["target"]
print(f"恢复故障: {fault_id}")
if fault_type == FaultType.TUNNEL_DOWN:
if target == "la1":
subprocess.run(f"systemctl start wg-quick@wg0", shell=True)
elif target == "la2":
subprocess.run(f"systemctl start wg-quick@wg1", shell=True)
elif fault_type in [FaultType.HIGH_LATENCY, FaultType.PACKET_LOSS, FaultType.BANDWIDTH_LIMIT]:
if target == "la1":
iface = "wg0"
else:
iface = "wg1"
# 清除tc规则
subprocess.run(f"tc qdisc del dev {iface} root 2>/dev/null || true", shell=True)
elif fault_type == FaultType.INTERFACE_DOWN:
if target == "la1":
iface = "wg0"
else:
iface = "wg1"
subprocess.run(f"ip link set {iface} up", shell=True)
# 从活动故障中移除
del self.active_faults[fault_id]
def recover_all_faults(self):
"""恢复所有故障"""
for fault_id in list(self.active_faults.keys()):
self.recover_fault(fault_id)
def run_scenario(self, scenario_name: str):
"""运行测试场景"""
scenario = next((s for s in self.test_scenarios if s["name"] == scenario_name), None)
if not scenario:
print(f"场景不存在: {scenario_name}")
return False
print(f"开始运行场景: {scenario['name']}")
print(f"描述: {scenario['description']}")
results = []
for i, step in enumerate(scenario["steps"]):
print(f"\n步骤 {i+1}: {step['type'].value} -> {step['target']} ({step['duration']}秒)")
# 注入故障
fault_id = self.inject_fault(
step["type"],
step["target"],
step["duration"]
)
# 记录注入前的状态
pre_state = self.get_current_state()
# 等待故障生效
wait_time = min(step["duration"], 10) if step["duration"] > 0 else 5
print(f"等待 {wait_time} 秒让故障生效...")
time.sleep(wait_time)
# 记录注入后的状态
post_state = self.get_current_state()
# 验证切换(如果适用)
if step["type"] in [FaultType.TUNNEL_DOWN, FaultType.HIGH_LATENCY, FaultType.PACKET_LOSS]:
verification = self.verify_failover(step["target"])
else:
verification = {"verified": True, "message": "无需验证"}
results.append({
"step": i+1,
"fault_type": step["type"].value,
"target": step["target"],
"duration": step["duration"],
"pre_state": pre_state,
"post_state": post_state,
"verification": verification
})
# 如果不是最后一步且有持续时间,等待故障持续
if i < len(scenario["steps"]) - 1 and step["duration"] > wait_time:
remaining = step["duration"] - wait_time
print(f"等待故障持续 {remaining} 秒...")
time.sleep(remaining)
# 生成测试报告
report = self.generate_report(scenario, results)
print(f"\n场景完成: {scenario['name']}")
print(f"报告已保存: {report['report_file']}")
return report
def get_current_state(self):
"""获取当前网络状态"""
state = {
"timestamp": datetime.now().isoformat(),
"routes": subprocess.getoutput("ip route show default"),
"wg_status": subprocess.getoutput("wg show"),
"ospf_cost": {},
"tunnel_health": {}
}
# 获取OSPF cost
for target in ["la1", "la2"]:
iface = "wg0" if target == "la1" else "wg1"
cmd = f"vtysh -c 'show interface {iface}' | grep 'cost'"
output = subprocess.getoutput(cmd)
if "cost" in output:
cost = output.split("cost")[1].split(",")[0].strip()
state["ospf_cost"][target] = cost
return state
def verify_failover(self, faulty_target: str):
"""验证故障切换是否正确"""
# 获取当前默认路由
default_route = subprocess.getoutput("ip route show default")
if faulty_target == "la1":
# LA1故障时,默认路由应该指向LA2
if "wg1" in default_route:
return {"verified": True, "message": "成功切换到LA2"}
else:
return {"verified": False, "message": "未切换到LA2"}
else:
# LA2故障时,默认路由应该指向LA1
if "wg0" in default_route:
return {"verified": True, "message": "成功切换到LA1"}
else:
return {"verified": False, "message": "未切换到LA1"}
def generate_report(self, scenario, results):
"""生成测试报告"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"/var/log/fault_test_{scenario['name']}_{timestamp}.json"
report = {
"scenario": scenario,
"results": results,
"summary": {
"total_steps": len(results),
"passed_steps": sum(1 for r in results if r["verification"]["verified"]),
"failed_steps": sum(1 for r in results if not r["verification"]["verified"])
},
"timestamp": datetime.now().isoformat(),
"hostname": subprocess.getoutput("hostname")
}
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
report["report_file"] = report_file
return report
def main():
import argparse
parser = argparse.ArgumentParser(description="故障注入测试工具")
parser.add_argument("--scenario", help="运行指定场景")
parser.add_argument("--list-scenarios", action="store_true", help="列出所有场景")
parser.add_argument("--inject", help="注入指定故障,格式: type,target,duration[,param1=value1,...]")
parser.add_argument("--recover", help="恢复指定故障ID")
parser.add_argument("--recover-all", action="store_true", help="恢复所有故障")
parser.add_argument("--status", action="store_true", help="显示当前状态")
args = parser.parse_args()
injector = FaultInjector()
if args.list_scenarios:
print("可用测试场景:")
for scenario in injector.test_scenarios:
print(f" - {scenario['name']}: {scenario['description']}")
elif args.scenario:
injector.run_scenario(args.scenario)
elif args.inject:
# 解析故障参数
parts = args.inject.split(",")
if len(parts) < 3:
print("格式错误,需要: type,target,duration[,param1=value1,...]")
return
fault_type = FaultType(parts[0])
target = parts[1]
duration = int(parts[2])
# 解析额外参数
params = {}
if len(parts) > 3:
for param in parts[3:]:
if "=" in param:
key, value = param.split("=", 1)
params[key] = value
injector.inject_fault(fault_type, target, duration, params)
elif args.recover:
injector.recover_fault(args.recover)
elif args.recover_all:
injector.recover_all_faults()
elif args.status:
state = injector.get_current_state()
print(json.dumps(state, indent=2))
else:
parser.print_help()
if __name__ == "__main__":
main()
第15天:性能基准测试与文档
15.1 性能基准测试脚本
脚本 15.1.1:综合性能测试
#!/usr/bin/env python3
# performance_benchmark.py
import subprocess
import time
import json
import statistics
from datetime import datetime
import concurrent.futures
class PerformanceBenchmark:
def __init__(self):
self.results = {}
self.test_targets = [
{"name": "google_dns", "host": "8.8.8.8", "port": 53},
{"name": "cloudflare_dns", "host": "1.1.1.1", "port": 53},
{"name": "google_http", "host": "www.google.com", "port": 80},
{"name": "cloudflare_http", "host": "www.cloudflare.com", "port": 80}
]
def run_iperf_test(self, tunnel, duration=10):
"""运行iperf带宽测试"""
if tunnel == "la1":
server_ip = "10.100.1.2"
iface = "wg0"
else:
server_ip = "10.100.2.2"
iface = "wg1"
# 需要在LA1和LA2上启动iperf3服务器
cmd = f"iperf3 -c {server_ip} -i 1 -t {duration} -b 100M --bind-dev {iface} -J"
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=duration+5)
if result.returncode == 0:
data = json.loads(result.stdout)
return data.get("end", {}).get("sum_received", {})
return None
except:
return None
def run_latency_test(self, tunnel, count=100):
"""运行延迟测试"""
if tunnel == "la1":
target = "10.100.1.2"
iface = "wg0"
else:
target = "10.100.2.2"
iface = "wg1"
latencies = []
for _ in range(count):
start = time.time()
try:
subprocess.run(
f"ping -c 1 -W 1 -I {iface} {target}",
shell=True,
capture_output=True,
timeout=2
)
latencies.append((time.time() - start) * 1000) # 转换为毫秒
except:
latencies.append(1000) # 超时
if latencies:
return {
"min": min(latencies),
"max": max(latencies),
"avg": statistics.mean(latencies),
"std": statistics.stdev(latencies) if len(latencies) > 1 else 0,
"loss": latencies.count(1000) / len(latencies) * 100
}
return None
def run_throughput_test(self, tunnel, test_size_mb=100):
"""运行吞吐量测试(使用scp/wget模拟)"""
if tunnel == "la1":
iface = "wg0"
test_url = "http://10.100.1.2/testfile" # 需要在LA1上提供测试文件
else:
iface = "wg1"
test_url = "http://10.100.2.2/testfile" # 需要在LA2上提供测试文件
# 使用wget下载测试文件
start = time.time()
try:
cmd = f"wget -O /dev/null --bind-address={iface} {test_url} 2>&1 | tail -2"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=300)
elapsed = time.time() - start
# 解析wget输出获取速度
lines = result.stdout.split('\n')
for line in lines:
if "MB/s" in line or "KB/s" in line:
return {"elapsed": elapsed, "output": line.strip()}
return {"elapsed": elapsed, "output": "未知"}
except:
return None
def run_concurrent_test(self, tunnel, concurrent_connections=10, duration=30):
"""运行并发连接测试"""
results = []
def single_connection_test(conn_id):
if tunnel == "la1":
target = "10.100.1.2"
iface = "wg0"
else:
target = "10.100.2.2"
iface = "wg1"
successes = 0
failures = 0
start_time = time.time()
while time.time() - start_time < duration:
try:
cmd = f"timeout 2 curl --interface {iface} -s http://{target}/ > /dev/null"
subprocess.run(cmd, shell=True, check=True)
successes += 1
except:
failures += 1
time.sleep(0.1)
return {"successes": successes, "failures": failures}
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_connections) as executor:
futures = [executor.submit(single_connection_test, i) for i in range(concurrent_connections)]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
total_successes = sum(r["successes"] for r in results)
total_failures = sum(r["failures"] for r in results)
return {
"total_connections": concurrent_connections,
"success_rate": total_successes / (total_successes + total_failures) * 100 if (total_successes + total_failures) > 0 else 0,
"requests_per_second": total_successes / duration
}
def run_comprehensive_test(self, tunnel):
"""运行全面性能测试"""
print(f"开始测试隧道: {tunnel}")
test_results = {
"tunnel": tunnel,
"timestamp": datetime.now().isoformat(),
"tests": {}
}
# 1. 延迟测试
print(" 执行延迟测试...")
latency_result = self.run_latency_test(tunnel, 50)
test_results["tests"]["latency"] = latency_result
# 2. 带宽测试
print(" 执行带宽测试...")
bandwidth_result = self.run_iperf_test(tunnel, 15)
test_results["tests"]["bandwidth"] = bandwidth_result
# 3. 吞吐量测试
print(" 执行吞吐量测试...")
throughput_result = self.run_throughput_test(tunnel, 50)
test_results["tests"]["throughput"] = throughput_result
# 4. 并发测试
print(" 执行并发测试...")
concurrent_result = self.run_concurrent_test(tunnel, 20, 60)
test_results["tests"]["concurrent"] = concurrent_result
# 5. 路由收敛测试
print(" 执行路由收敛测试...")
convergence_result = self.test_route_convergence(tunnel)
test_results["tests"]["convergence"] = convergence_result
return test_results
def test_route_convergence(self, tunnel_to_fail):
"""测试路由收敛时间"""
if tunnel_to_fail == "la1":
iface = "wg0"
other_tunnel = "la2"
else:
iface = "wg1"
other_tunnel = "la1"
# 记录当前路由
original_route = subprocess.getoutput("ip route show default")
print(f" 原始路由: {original_route}")
# 注入故障
print(f" 注入故障到 {tunnel_to_fail}...")
subprocess.run(f"ip link set {iface} down", shell=True)
# 开始计时
start_time = time.time()
# 监控路由变化
convergence_time = None
for i in range(30): # 最多等待30秒
time.sleep(1)
current_route = subprocess.getoutput("ip route show default")
if iface not in current_route:
convergence_time = time.time() - start_time
print(f" 路由收敛完成,耗时: {convergence_time:.2f}秒")
break
# 恢复故障
subprocess.run(f"ip link set {iface} up", shell=True)
# 等待恢复
time.sleep(5)
# 验证恢复
final_route = subprocess.getoutput("ip route show default")
return {
"convergence_time_seconds": convergence_time,
"original_route": original_route,
"final_route": final_route,
"success": convergence_time is not None and convergence_time < 10
}
def compare_tunnels(self):
"""比较两个隧道的性能"""
print("开始双隧道性能对比测试")
print("="*60)
results = {}
# 分别测试两个隧道
for tunnel in ["la1", "la2"]:
print(f"\n测试隧道: {tunnel}")
results[tunnel] = self.run_comprehensive_test(tunnel)
# 生成对比报告
comparison = self.generate_comparison_report(results)
# 保存结果
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"/var/log/performance_comparison_{timestamp}.json"
with open(report_file, "w") as f:
json.dump({
"results": results,
"comparison": comparison,
"timestamp": datetime.now().isoformat()
}, f, indent=2)
print(f"\n对比报告已保存: {report_file}")
# 打印简要对比
print("\n" + "="*60)
print("性能对比摘要:")
print(f"{'指标':<20} {'LA1':<15} {'LA2':<15} {'优胜者':<10}")
print("-"*60)
metrics = comparison.get("metrics", {})
for metric, values in metrics.items():
la1_val = values.get("la1", 0)
la2_val = values.get("la2", 0)
if "latency" in metric:
winner = "LA1" if la1_val < la2_val else "LA2" if la2_val < la1_val else "平局"
elif "loss" in metric or "error" in metric:
winner = "LA1" if la1_val < la2_val else "LA2" if la2_val < la1_val else "平局"
else: # 带宽、吞吐量等
winner = "LA1" if la1_val > la2_val else "LA2" if la2_val > la1_val else "平局"
print(f"{metric:<20} {la1_val:<15.2f} {la2_val:<15.2f} {winner:<10}")
return results
def generate_comparison_report(self, results):
"""生成对比报告"""
comparison = {
"metrics": {},
"recommendation": ""
}
# 提取关键指标
for tunnel in ["la1", "la2"]:
tunnel_results = results.get(tunnel, {}).get("tests", {})
# 延迟
latency = tunnel_results.get("latency", {})
comparison["metrics"][f"{tunnel}_avg_latency_ms"] = latency.get("avg", 0)
comparison["metrics"][f"{tunnel}_packet_loss_percent"] = latency.get("loss", 100)
# 带宽
bandwidth = tunnel_results.get("bandwidth", {})
if bandwidth:
comparison["metrics"][f"{tunnel}_bandwidth_mbps"] = bandwidth.get("bits_per_second", 0) / 1e6
# 并发性能
concurrent = tunnel_results.get("concurrent", {})
comparison["metrics"][f"{tunnel}_success_rate_percent"] = concurrent.get("success_rate", 0)
comparison["metrics"][f"{tunnel}_requests_per_second"] = concurrent.get("requests_per_second", 0)
# 收敛时间
convergence = tunnel_results.get("convergence", {})
comparison["metrics"][f"{tunnel}_convergence_time_seconds"] = convergence.get("convergence_time_seconds", 30)
# 计算综合评分
la1_score = self.calculate_overall_score(comparison, "la1")
la2_score = self.calculate_overall_score(comparison, "la2")
comparison["overall_scores"] = {
"la1": la1_score,
"la2": la2_score
}
# 给出推荐
if la1_score > la2_score + 10:
comparison["recommendation"] = "推荐使用LA1作为主隧道,LA2作为备用"
elif la2_score > la1_score + 10:
comparison["recommendation"] = "推荐使用LA2作为主隧道,LA1作为备用"
else:
comparison["recommendation"] = "两个隧道性能相当,建议负载均衡"
return comparison
def calculate_overall_score(self, comparison, tunnel):
"""计算综合评分"""
score = 0
# 延迟评分(越低越好)
latency = comparison["metrics"].get(f"{tunnel}_avg_latency_ms", 300)
if latency < 100:
score += 30
elif latency < 200:
score += 20
elif latency < 300:
score += 10
# 丢包评分(越低越好)
loss = comparison["metrics"].get(f"{tunnel}_packet_loss_percent", 100)
if loss < 1:
score += 25
elif loss < 5:
score += 20
elif loss < 10:
score += 10
# 带宽评分(越高越好)
bandwidth = comparison["metrics"].get(f"{tunnel}_bandwidth_mbps", 0)
if bandwidth > 500:
score += 25
elif bandwidth > 200:
score += 20
elif bandwidth > 100:
score += 15
elif bandwidth > 50:
score += 10
# 成功率评分(越高越好)
success_rate = comparison["metrics"].get(f"{tunnel}_success_rate_percent", 0)
if success_rate > 99:
score += 20
elif success_rate > 95:
score += 15
elif success_rate > 90:
score += 10
return score
def main():
import argparse
parser = argparse.ArgumentParser(description="性能基准测试工具")
parser.add_argument("--tunnel", choices=["la1", "la2", "both"], default="both",
help="测试的隧道")
parser.add_argument("--test", choices=["latency", "bandwidth", "concurrent", "all"],
default="all", help="测试类型")
parser.add_argument("--duration", type=int, default=30,
help="测试持续时间(秒)")
parser.add_argument("--output", help="输出文件路径")
args = parser.parse_args()
benchmark = PerformanceBenchmark()
if args.tunnel == "both":
results = benchmark.compare_tunnels()
else:
if args.test == "all":
results = benchmark.run_comprehensive_test(args.tunnel)
elif args.test == "latency":
results = benchmark.run_latency_test(args.tunnel, 50)
elif args.test == "bandwidth":
results = benchmark.run_iperf_test(args.tunnel, args.duration)
elif args.test == "concurrent":
results = benchmark.run_concurrent_test(args.tunnel, 20, args.duration)
print(json.dumps(results, indent=2))
if args.output:
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()
第三阶段:运维优化与生产就绪实施方案
目标周期:10个工作日
完成标准:监控完善、文档完整、演练成功
第16-17天:监控体系完善
16.1 高级监控与可视化
脚本 16.1.1:高级监控仪表板配置
{
"dashboard": {
"title": "跨境SD-WAN高级监控",
"tags": ["production", "network", "sdwan"],
"panels": [
{
"title": "隧道健康状态矩阵",
"type": "table",
"targets": [{
"expr": "tunnel_health_score",
"instant": true,
"format": "table"
}],
"gridPos": {"h": 6, "w": 24, "x": 0, "y": 0}
},
{
"title": "链路质量热力图",
"type": "heatmap",
"targets": [{
"expr": "rate(node_network_receive_bytes_total{device=~\"wg.*\"}[5m]) * 8",
"legendFormat": "{{device}}"
}],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 6}
},
{
"title": "OSPF路由状态",
"type": "stat",
"targets": [{
"expr": "frr_ospf_neighbor_state",
"instant": true,
"legendFormat": "{{neighbor}}"
}],
"gridPos": {"h": 4, "w": 12, "x": 12, "y": 6}
}
]
}
}
16.2 自动化巡检脚本
脚本 16.2.1:每日自动化巡检
#!/usr/bin/env python3
# daily_inspection.py
import subprocess
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import socket
class DailyInspector:
def __init__(self):
self.hostname = socket.gethostname()
self.inspection_time = datetime.now()
self.results = {
"timestamp": self.inspection_time.isoformat(),
"hostname": self.hostname,
"checks": {},
"summary": {"total": 0, "passed": 0, "failed": 0, "warnings": 0}
}
def run_check(self, name, check_func):
"""运行单个检查"""
try:
result = check_func()
self.results["checks"][name] = result
self.results["summary"]["total"] += 1
if result["status"] == "PASS":
self.results["summary"]["passed"] += 1
elif result["status"] == "FAIL":
self.results["summary"]["failed"] += 1
else:
self.results["summary"]["warnings"] += 1
return result
except Exception as e:
error_result = {
"status": "ERROR",
"message": f"检查执行失败: {str(e)}",
"details": None
}
self.results["checks"][name] = error_result
self.results["summary"]["total"] += 1
self.results["summary"]["failed"] += 1
return error_result
def check_system_health(self):
"""检查系统健康状态"""
checks = {}
# 1. 系统负载
load = subprocess.getoutput("cat /proc/loadavg").split()[0:3]
checks["system_load"] = {
"value": load,
"threshold": [2.0, 4.0, 8.0], # 1min, 5min, 15min阈值
"status": "PASS" if float(load[0]) < 2.0 else "WARNING"
}
# 2. 内存使用
mem_info = subprocess.getoutput("free -m").split('\n')[1].split()
mem_total = int(mem_info[1])
mem_used = int(mem_info[2])
mem_usage = (mem_used / mem_total) * 100
checks["memory_usage"] = {
"value": f"{mem_usage:.1f}%",
"threshold": 85,
"status": "PASS" if mem_usage < 85 else "WARNING"
}
# 3. 磁盘使用
disk_info = subprocess.getoutput("df -h /").split('\n')[1].split()
disk_usage = disk_info[4].replace('%', '')
checks["disk_usage"] = {
"value": f"{disk_usage}%",
"threshold": 90,
"status": "PASS" if int(disk_usage) < 90 else "WARNING"
}
return {
"status": "PASS" if all(c["status"] == "PASS" for c in checks.values()) else "WARNING",
"message": "系统健康检查完成",
"details": checks
}
def check_network_services(self):
"""检查网络服务状态"""
services = [
"frr", "wg-quick@wg0", "wg-quick@wg1",
"health-checker", "xray"
]
results = {}
failed_services = []
for service in services:
try:
# 检查服务状态
cmd = f"systemctl is-active {service}"
status = subprocess.check_output(cmd, shell=True, text=True).strip()
results[service] = {
"status": status,
"active": status == "active"
}
if status != "active":
failed_services.append(service)
except subprocess.CalledProcessError:
results[service] = {
"status": "inactive",
"active": False
}
failed_services.append(service)
return {
"status": "PASS" if not failed_services else "FAIL",
"message": f"网络服务检查: {len(failed_services)}个服务异常" if failed_services else "所有网络服务正常",
"details": results,
"failed_services": failed_services
}
def check_tunnel_connectivity(self):
"""检查隧道连通性"""
tunnels = [
{"name": "la1", "interface": "wg0", "gateway": "10.100.1.2"},
{"name": "la2", "interface": "wg1", "gateway": "10.100.2.2"}
]
results = {}
failed_tunnels = []
for tunnel in tunnels:
try:
# 测试连通性
cmd = f"ping -c 3 -W 1 -I {tunnel['interface']} {tunnel['gateway']}"
output = subprocess.check_output(cmd, shell=True, text=True)
# 解析ping结果
if "0% packet loss" in output:
# 提取延迟
for line in output.split('\n'):
if "rtt min/avg/max/mdev" in line:
latency = line.split('/')[4] # 平均延迟
break
results[tunnel['name']] = {
"status": "up",
"latency_ms": float(latency),
"packet_loss": 0
}
else:
results[tunnel['name']] = {
"status": "degraded",
"latency_ms": None,
"packet_loss": 100
}
failed_tunnels.append(tunnel['name'])
except subprocess.CalledProcessError:
results[tunnel['name']] = {
"status": "down",
"latency_ms": None,
"packet_loss": 100
}
failed_tunnels.append(tunnel['name'])
return {
"status": "PASS" if not failed_tunnels else "FAIL",
"message": f"隧道连通性: {len(failed_tunnels)}个隧道异常" if failed_tunnels else "所有隧道连通正常",
"details": results,
"failed_tunnels": failed_tunnels
}
def check_routing_status(self):
"""检查路由状态"""
try:
# 检查默认路由
default_route = subprocess.getoutput("ip route show default")
# 检查OSPF邻居
ospf_neighbors = subprocess.getoutput("vtysh -c 'show ip ospf neighbor'")
# 检查路由表
route_count = subprocess.getoutput("vtysh -c 'show ip route' | wc -l")
return {
"status": "PASS",
"message": "路由状态正常",
"details": {
"default_route": default_route,
"ospf_neighbor_count": len([l for l in ospf_neighbors.split('\n') if 'Full' in l]),
"total_routes": int(route_count)
}
}
except Exception as e:
return {
"status": "FAIL",
"message": f"路由检查失败: {str(e)}",
"details": None
}
def check_security_status(self):
"""检查安全状态"""
checks = {}
# 1. 检查开放的端口
open_ports = subprocess.getoutput("ss -tlnp | grep LISTEN | wc -l")
checks["open_ports"] = {
"value": int(open_ports),
"threshold": 20,
"status": "PASS" if int(open_ports) < 20 else "WARNING"
}
# 2. 检查登录失败
failed_logins = subprocess.getoutput("grep 'Failed password' /var/log/auth.log | wc -l")
checks["failed_logins"] = {
"value": int(failed_logins),
"threshold": 10,
"status": "PASS" if int(failed_logins) < 10 else "WARNING"
}
# 3. 检查防火墙规则
iptables_rules = subprocess.getoutput("iptables -L -n | wc -l")
checks["firewall_rules"] = {
"value": int(iptables_rules),
"status": "PASS" if int(iptables_rules) > 10 else "WARNING" # 至少应该有10条规则
}
return {
"status": "PASS" if all(c["status"] == "PASS" for c in checks.values()) else "WARNING",
"message": "安全检查完成",
"details": checks
}
def run_all_checks(self):
"""运行所有检查"""
print(f"开始每日巡检: {self.hostname}")
print("="*60)
checks = [
("系统健康检查", self.check_system_health),
("网络服务检查", self.check_network_services),
("隧道连通性检查", self.check_tunnel_connectivity),
("路由状态检查", self.check_routing_status),
("安全检查", self.check_security_status)
]
for name, func in checks:
print(f"执行: {name}")
self.run_check(name, func)
# 生成报告
self.generate_report()
# 发送通知(如果有失败)
if self.results["summary"]["failed"] > 0:
self.send_alert()
return self.results
def generate_report(self):
"""生成巡检报告"""
timestamp = self.inspection_time.strftime("%Y%m%d")
report_file = f"/var/log/daily_inspection_{timestamp}.json"
# 保存详细报告
with open(report_file, "w") as f:
json.dump(self.results, f, indent=2)
# 生成HTML报告
html_report = self.generate_html_report()
html_file = f"/var/log/daily_inspection_{timestamp}.html"
with open(html_file, "w") as f:
f.write(html_report)
# 生成简要文本报告
text_report = self.generate_text_report()
text_file = f"/var/log/daily_inspection_{timestamp}.txt"
with open(text_file, "w") as f:
f.write(text_report)
print(f"\n巡检报告:")
print(f" JSON详细报告: {report_file}")
print(f" HTML可视化报告: {html_file}")
print(f" 文本简要报告: {text_file}")
# 打印摘要
print(f"\n巡检摘要:")
print(f" 总计检查: {self.results['summary']['total']}")
print(f" 通过: {self.results['summary']['passed']} ✅")
print(f" 失败: {self.results['summary']['failed']} ❌")
print(f" 警告: {self.results['summary']['warnings']} ⚠️")
def generate_html_report(self):
"""生成HTML格式的报告"""
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>每日巡检报告 - {self.hostname}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.summary {{ background: #f5f5f5; padding: 15px; border-radius: 5px; }}
.check {{ margin: 10px 0; padding: 10px; border-left: 4px solid #ccc; }}
.pass {{ border-color: #4CAF50; background: #e8f5e9; }}
.fail {{ border-color: #f44336; background: #ffebee; }}
.warning {{ border-color: #ff9800; background: #fff3e0; }}
.details {{ margin-top: 10px; padding: 10px; background: white; border-radius: 3px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<h1>每日巡检报告</h1>
<div class="summary">
<h2>巡检摘要</h2>
<p><strong>时间:</strong> {self.inspection_time}</p>
<p><strong>主机:</strong> {self.hostname}</p>
<p><strong>状态:</strong>
<span style="color: {'green' if self.results['summary']['failed'] == 0 else 'red'}">
{('正常' if self.results['summary']['failed'] == 0 else '异常')}
</span>
</p>
<p><strong>检查统计:</strong>
总计: {self.results['summary']['total']} |
通过: {self.results['summary']['passed']} |
失败: {self.results['summary']['failed']} |
警告: {self.results['summary']['warnings']}
</p>
</div>
<h2>详细检查结果</h2>
"""
for check_name, check_result in self.results["checks"].items():
status_class = check_result["status"].lower()
html += f"""
<div class="check {status_class}">
<h3>{check_name} - <span class="status">{check_result['status']}</span></h3>
<p>{check_result['message']}</p>
"""
if check_result.get("details"):
html += """
<div class="details">
<h4>详细信息:</h4>
"""
if isinstance(check_result["details"], dict):
html += "<table>"
for key, value in check_result["details"].items():
if isinstance(value, dict):
value_str = json.dumps(value, indent=2)
else:
value_str = str(value)
html += f"<tr><th>{key}</th><td>{value_str}</td></tr>"
html += "</table>"
else:
html += f"<pre>{check_result['details']}</pre>"
html += "</div>"
html += "</div>"
html += """
</body>
</html>
"""
return html
def generate_text_report(self):
"""生成文本格式的报告"""
text = f"""
每日巡检报告
================
时间: {self.inspection_time}
主机: {self.hostname}
状态: {'正常' if self.results['summary']['failed'] == 0 else '异常'}
检查统计:
总计: {self.results['summary']['total']}
通过: {self.results['summary']['passed']} ✅
失败: {self.results['summary']['failed']} ❌
警告: {self.results['summary']['warnings']} ⚠️
详细结果:
"""
for check_name, check_result in self.results["checks"].items():
status_icon = "✅" if check_result["status"] == "PASS" else "❌" if check_result["status"] == "FAIL" else "⚠️"
text += f"\n{status_icon} {check_name}: {check_result['message']}"
if check_result.get("failed_services"):
text += f"\n 失败服务: {', '.join(check_result['failed_services'])}"
if check_result.get("failed_tunnels"):
text += f"\n 失败隧道: {', '.join(check_result['failed_tunnels'])}"
return text
def send_alert(self):
"""发送告警通知"""
# 这里可以集成邮件、钉钉、Slack等通知渠道
print("\n发现失败检查,发送告警通知...")
# 示例:发送邮件
try:
self.send_email_alert()
print("邮件告警发送成功")
except Exception as e:
print(f"邮件发送失败: {str(e)}")
def send_email_alert(self):
"""发送邮件告警"""
# 配置邮件服务器
smtp_server = "smtp.example.com"
smtp_port = 587
smtp_user = "[email protected]"
smtp_password = "password"
# 邮件内容
subject = f"[告警] {self.hostname} 每日巡检发现异常"
text_content = self.generate_text_report()
html_content = self.generate_html_report()
# 创建邮件
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = smtp_user
msg["To"] = "[email protected]"
# 添加文本和HTML版本
part1 = MIMEText(text_content, "plain")
part2 = MIMEText(html_content, "html")
msg.attach(part1)
msg.attach(part2)
# 发送邮件
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_password)
server.send_message(msg)
def main():
inspector = DailyInspector()
results = inspector.run_all_checks()
# 根据检查结果决定退出码
if results["summary"]["failed"] > 0:
exit(1)
else:
exit(0)
if __name__ == "__main__":
main()
16.3 自动化巡检调度
配置文件 16.3.1:每日巡检的systemd定时器
# /etc/systemd/system/daily-inspection.service
[Unit]
Description=Daily Network Inspection
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
User=root
ExecStart=/usr/local/bin/daily_inspection.py
StandardOutput=journal
.....
第18-19天:日志聚合与备份策略(续)
18.3 日志分析工具配置
脚本 18.3.1:ELK Stack日志分析配置
#!/bin/bash
# setup_elk_logging.sh
# 在日志服务器上执行
set -e
echo "=== 配置ELK Stack日志分析 ==="
# 1. 安装Elasticsearch
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update
apt install -y elasticsearch
# 2. 配置Elasticsearch
cat > /etc/elasticsearch/elasticsearch.yml << 'EOF'
cluster.name: tunnel-logging
node.name: ${HOSTNAME}
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
discovery.type: single-node
xpack.security.enabled: false
EOF
# 3. 安装Logstash
apt install -y logstash
# 4. 配置Logstash管道
mkdir -p /etc/logstash/conf.d
cat > /etc/logstash/conf.d/tunnel-logs.conf << 'EOF'
input {
tcp {
port => 514
type => "syslog"
}
udp {
port => 514
type => "syslog"
}
}
filter {
# 解析syslog格式
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
}
# 添加时间戳
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
# 分类处理不同类型的日志
if [syslog_program] =~ /wg-quick/ {
mutate {
add_field => { "[@metadata][index]" => "wireguard-logs" }
add_tag => ["wireguard", "tunnel"]
}
}
if [syslog_program] =~ /frr/ {
mutate {
add_field => { "[@metadata][index]" => "frr-logs" }
add_tag => ["frr", "routing"]
}
# 解析FRR特定日志
grok {
match => { "syslog_message" => "%{DATA:frr_component}: %{GREEDYDATA:frr_message}" }
}
}
if [syslog_program] =~ /health-checker/ {
mutate {
add_field => { "[@metadata][index]" => "health-check-logs" }
add_tag => ["health-check", "monitoring"]
}
# 解析健康检查日志
grok {
match => { "syslog_message" => "隧道 %{DATA:tunnel} 检查完成: 评分=%{NUMBER:score}, 推荐cost=%{NUMBER:recommended_cost}" }
}
}
# 添加地理位置信息
if [syslog_hostname] =~ /hz/ {
mutate {
add_field => { "location" => "hangzhou" }
}
} elsif [syslog_hostname] =~ /la/ {
mutate {
add_field => { "location" => "los_angeles" }
}
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][index]}-%{+YYYY.MM.dd}"
}
# 同时输出到控制台用于调试
stdout { codec => rubydebug }
}
EOF
# 5. 安装Kibana
apt install -y kibana
# 6. 配置Kibana
cat > /etc/kibana/kibana.yml << 'EOF'
server.port: 5601
server.host: "0.0.0.0"
server.name: "tunnel-kibana"
elasticsearch.hosts: ["http://localhost:9200"]
elasticsearch.requestTimeout: 30000
kibana.index: ".kibana"
logging.dest: /var/log/kibana/kibana.log
EOF
# 7. 启动服务
systemctl daemon-reload
systemctl enable elasticsearch logstash kibana
systemctl start elasticsearch
sleep 30 # 等待Elasticsearch启动
systemctl start logstash kibana
# 8. 创建索引模板
curl -X PUT "localhost:9200/_template/tunnel-logs" -H 'Content-Type: application/json' -d'
{
"index_patterns": ["*-logs-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "5s"
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"syslog_hostname": {"type": "keyword"},
"syslog_program": {"type": "keyword"},
"location": {"type": "keyword"},
"tunnel": {"type": "keyword"},
"score": {"type": "float"},
"recommended_cost": {"type": "integer"},
"message": {"type": "text"}
}
}
}
'
# 9. 创建Kibana仪表板
cat > /tmp/kibana-dashboard.json << 'EOF'
{
"objects": [
{
"id": "tunnel-overview-dashboard",
"type": "dashboard",
"attributes": {
"title": "隧道日志概览",
"hits": 0,
"description": "",
"panelsJSON": "[{\"version\":\"7.14.0\",\"panelIndex\":\"1\",\"gridData\":{\"x\":0,\"y\":0,\"w\":24,\"h\":15,\"i\":\"1\"},\"panelRefName\":\"panel_1\"}]",
"optionsJSON": "{\"darkTheme\":false}",
"version": 1
},
"references": [
{
"name": "panel_1",
"type": "visualization",
"id": "tunnel-logs-overview"
}
]
}
]
}
'
echo "=== ELK Stack配置完成 ==="
echo "访问地址:"
echo " Kibana: http://$(hostname -I | awk '{print $1}'):5601"
echo " Elasticsearch: http://$(hostname -I | awk '{print $1}'):9200"
18.4 备份验证与恢复演练
脚本 18.4.1:备份恢复测试脚本
#!/usr/bin/env python3
# backup_recovery_test.py
import subprocess
import os
import shutil
import time
import json
from datetime import datetime, timedelta
class BackupRecoveryTester:
def __init__(self):
self.backup_dir = "/backup/tunnel-config"
self.test_dir = "/tmp/backup_test"
self.results = []
def cleanup_test_dir(self):
"""清理测试目录"""
if os.path.exists(self.test_dir):
shutil.rmtree(self.test_dir)
os.makedirs(self.test_dir, exist_ok=True)
def find_latest_backup(self):
"""查找最新的备份文件"""
daily_dir = os.path.join(self.backup_dir, "daily")
if not os.path.exists(daily_dir):
return None
backup_files = []
for root, dirs, files in os.walk(daily_dir):
for file in files:
if file.endswith(".tar.gz"):
backup_files.append(os.path.join(root, file))
if not backup_files:
return None
# 按修改时间排序
backup_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
return backup_files[0]
def verify_backup_integrity(self, backup_file):
"""验证备份文件完整性"""
print(f"验证备份文件完整性: {backup_file}")
# 1. 检查文件存在
if not os.path.exists(backup_file):
return {"status": "FAIL", "message": "备份文件不存在"}
# 2. 检查文件大小
file_size = os.path.getsize(backup_file)
if file_size < 1024: # 小于1KB的备份文件可能有问题
return {"status": "WARNING", "message": f"备份文件过小: {file_size}字节"}
# 3. 验证tar.gz格式
try:
cmd = f"tar -tzf {backup_file} | head -5"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
return {"status": "FAIL", "message": "备份文件格式错误"}
except:
return {"status": "FAIL", "message": "无法读取备份文件"}
# 4. 提取并验证关键文件
try:
# 创建临时目录
temp_dir = "/tmp/backup_verify"
os.makedirs(temp_dir, exist_ok=True)
# 解压备份
cmd = f"tar -xzf {backup_file} -C {temp_dir}"
subprocess.run(cmd, shell=True, check=True)
# 检查关键文件是否存在
critical_files = [
"etc/wireguard/wg0.conf",
"etc/frr/frr.conf",
"etc/iptables/rules.v4"
]
missing_files = []
for file in critical_files:
if not os.path.exists(os.path.join(temp_dir, file)):
missing_files.append(file)
# 清理临时目录
shutil.rmtree(temp_dir)
if missing_files:
return {"status": "WARNING", "message": f"缺少关键文件: {', '.join(missing_files)}"}
return {"status": "PASS", "message": "备份文件完整性验证通过", "size_mb": file_size/(1024*1024)}
except Exception as e:
return {"status": "FAIL", "message": f"备份验证过程中出错: {str(e)}"}
def test_backup_restoration(self, backup_file):
"""测试备份恢复"""
print(f"测试备份恢复: {backup_file}")
self.cleanup_test_dir()
try:
# 1. 解压备份到测试目录
cmd = f"tar -xzf {backup_file} -C {self.test_dir}"
subprocess.run(cmd, shell=True, check=True)
# 2. 验证配置文件语法
config_checks = [
{
"name": "WireGuard配置",
"file": "etc/wireguard/wg0.conf",
"check_cmd": "wg-quick strip"
},
{
"name": "FRR配置",
"file": "etc/frr/frr.conf",
"check_cmd": "vtysh -C -f"
},
{
"name": "iptables规则",
"file": "etc/iptables/rules.v4",
"check_cmd": "iptables-restore -t"
}
]
check_results = []
for check in config_checks:
file_path = os.path.join(self.test_dir, check["file"])
if os.path.exists(file_path):
cmd = f"cat {file_path} | {check['check_cmd']} 2>&1"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
check_results.append({
"name": check["name"],
"status": "PASS",
"message": "配置文件语法正确"
})
else:
check_results.append({
"name": check["name"],
"status": "WARNING",
"message": f"配置文件可能有语法问题: {result.stderr[:100]}"
})
else:
check_results.append({
"name": check["name"],
"status": "WARNING",
"message": "配置文件不存在"
})
# 3. 模拟恢复过程
recovery_steps = []
# 停止服务
services = ["frr", "wg-quick@wg0", "wg-quick@wg1", "xray"]
for service in services:
recovery_steps.append({
"step": f"停止 {service}",
"simulated": True,
"status": "SUCCESS"
})
# 复制配置文件
recovery_steps.append({
"step": "复制配置文件到系统目录",
"simulated": True,
"status": "SUCCESS"
})
# 启动服务
for service in services:
recovery_steps.append({
"step": f"启动 {service}",
"simulated": True,
"status": "SUCCESS"
})
# 验证恢复
recovery_steps.append({
"step": "验证隧道连通性",
"simulated": True,
"status": "SUCCESS"
})
return {
"status": "PASS",
"message": "备份恢复测试通过",
"config_checks": check_results,
"recovery_steps": recovery_steps
}
except Exception as e:
return {
"status": "FAIL",
"message": f"备份恢复测试失败: {str(e)}"
}
def test_point_in_time_recovery(self, days_ago=1):
"""测试时间点恢复"""
print(f"测试 {days_ago} 天前的备份恢复")
# 查找指定天数的备份
target_date = datetime.now() - timedelta(days=days_ago)
date_str = target_date.strftime("%Y%m%d")
daily_dir = os.path.join(self.backup_dir, "daily")
backup_files = []
for root, dirs, files in os.walk(daily_dir):
for file in files:
if file.endswith(".tar.gz") and date_str in file:
backup_files.append(os.path.join(root, file))
if not backup_files:
return {"status": "FAIL", "message": f"未找到 {days_ago} 天前的备份"}
# 使用最新的匹配备份
backup_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
backup_file = backup_files[0]
# 验证并测试恢复
integrity_result = self.verify_backup_integrity(backup_file)
if integrity_result["status"] != "PASS":
return integrity_result
recovery_result = self.test_backup_restoration(backup_file)
return {
"status": recovery_result["status"],
"message": recovery_result["message"],
"backup_file": backup_file,
"backup_date": time.ctime(os.path.getmtime(backup_file))
}
def run_comprehensive_test(self):
"""运行全面的备份恢复测试"""
print("开始全面的备份恢复测试")
print("="*60)
test_cases = [
("验证最新备份", self.verify_backup_integrity, [self.find_latest_backup()]),
("测试最新备份恢复", self.test_backup_restoration, [self.find_latest_backup()]),
("测试1天前备份恢复", self.test_point_in_time_recovery, [1]),
("测试7天前备份恢复", self.test_point_in_time_recovery, [7])
]
for test_name, test_func, test_args in test_cases:
print(f"\n执行测试: {test_name}")
if test_args[0] is None:
result = {"status": "FAIL", "message": "未找到备份文件"}
else:
result = test_func(*test_args)
self.results.append({
"test": test_name,
"timestamp": datetime.now().isoformat(),
"result": result
})
status_icon = "✅" if result["status"] == "PASS" else "⚠️" if result["status"] == "WARNING" else "❌"
print(f" 结果: {status_icon} {result['status']} - {result['message']}")
# 生成测试报告
self.generate_test_report()
return self.results
def generate_test_report(self):
"""生成测试报告"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"/var/log/backup_recovery_test_{timestamp}.json"
report = {
"timestamp": datetime.now().isoformat(),
"hostname": subprocess.getoutput("hostname"),
"backup_dir": self.backup_dir,
"results": self.results,
"summary": {
"total_tests": len(self.results),
"passed": sum(1 for r in self.results if r["result"]["status"] == "PASS"),
"warnings": sum(1 for r in self.results if r["result"]["status"] == "WARNING"),
"failed": sum(1 for r in self.results if r["result"]["status"] == "FAIL")
}
}
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
# 生成简要报告
summary_file = f"/var/log/backup_recovery_summary_{timestamp}.txt"
with open(summary_file, "w") as f:
f.write("备份恢复测试报告\n")
f.write("="*60 + "\n")
f.write(f"测试时间: {report['timestamp']}\n")
f.write(f"主机: {report['hostname']}\n")
f.write(f"备份目录: {report['backup_dir']}\n")
f.write(f"测试总数: {report['summary']['total_tests']}\n")
f.write(f"通过: {report['summary']['passed']}\n")
f.write(f"警告: {report['summary']['warnings']}\n")
f.write(f"失败: {report['summary']['failed']}\n\n")
f.write("详细结果:\n")
for result in report["results"]:
status = result["result"]["status"]
icon = "✓" if status == "PASS" else "!" if status == "WARNING" else "✗"
f.write(f"{icon} {result['test']}: {result['result']['message']}\n")
print(f"\n测试报告:")
print(f" 详细报告: {report_file}")
print(f" 简要报告: {summary_file}")
# 总结
print(f"\n测试摘要:")
print(f" 总计: {report['summary']['total_tests']} 项测试")
print(f" 通过: {report['summary']['passed']} ✅")
print(f" 警告: {report['summary']['warnings']} ⚠️")
print(f" 失败: {report['summary']['failed']} ❌")
def main():
import argparse
parser = argparse.ArgumentParser(description="备份恢复测试工具")
parser.add_argument("--test", choices=["integrity", "recovery", "point-in-time", "full"],
default="full", help="测试类型")
parser.add_argument("--backup-file", help="指定备份文件路径")
parser.add_argument("--days-ago", type=int, default=1, help="恢复几天前的备份")
parser.add_argument("--output", help="输出报告路径")
args = parser.parse_args()
tester = BackupRecoveryTester()
if args.test == "integrity":
backup_file = args.backup_file or tester.find_latest_backup()
result = tester.verify_backup_integrity(backup_file)
print(json.dumps(result, indent=2))
elif args.test == "recovery":
backup_file = args.backup_file or tester.find_latest_backup()
result = tester.test_backup_restoration(backup_file)
print(json.dumps(result, indent=2))
elif args.test == "point-in-time":
result = tester.test_point_in_time_recovery(args.days_ago)
print(json.dumps(result, indent=2))
else: # full
results = tester.run_comprehensive_test()
# 如果有输出路径,保存结果
if args.output:
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
# 根据测试结果决定退出码
if any(r["result"]["status"] == "FAIL" for r in results):
exit(1)
elif any(r["result"]["status"] == "WARNING" for r in results):
exit(2)
else:
exit(0)
if __name__ == "__main__":
main()
第20天:文档与知识转移(续)
20.4 自动化文档生成与发布
脚本 20.4.1:自动化文档流水线
#!/usr/bin/env python3
# documentation_pipeline.py
import os
import json
import yaml
import markdown
from datetime import datetime
import subprocess
import shutil
from pathlib import Path
class DocumentationPipeline:
def __init__(self):
self.project_root = "/opt/tunnel-project"
self.docs_dir = os.path.join(self.project_root, "docs")
self.build_dir = os.path.join(self.project_root, "docs-build")
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 创建目录结构
self.create_directory_structure()
def create_directory_structure(self):
"""创建文档目录结构"""
dirs = [
self.docs_dir,
self.build_dir,
os.path.join(self.docs_dir, "architecture"),
os.path.join(self.docs_dir, "operations"),
os.path.join(self.docs_dir, "api"),
os.path.join(self.docs_dir, "troubleshooting"),
os.path.join(self.docs_dir, "training"),
os.path.join(self.build_dir, "html"),
os.path.join(self.build_dir, "pdf"),
os.path.join(self.build_dir, "json")
]
for dir_path in dirs:
os.makedirs(dir_path, exist_ok=True)
def gather_system_documentation(self):
"""收集系统文档"""
print("收集系统文档...")
docs = {
"system_info": self.get_system_info(),
"network_config": self.get_network_config(),
"service_config": self.get_service_config(),
"monitoring_config": self.get_monitoring_config(),
"backup_config": self.get_backup_config()
}
# 保存为JSON
json_file = os.path.join(self.build_dir, "json", "system_docs.json")
with open(json_file, "w") as f:
json.dump(docs, f, indent=2, ensure_ascii=False)
return docs
def get_system_info(self):
"""获取系统信息"""
info = {
"timestamp": self.timestamp,
"hostname": subprocess.getoutput("hostname"),
"os": {
"distribution": subprocess.getoutput("lsb_release -d | cut -f2"),
"kernel": subprocess.getoutput("uname -r"),
"architecture": subprocess.getoutput("uname -m")
},
"hardware": {
"cpu": subprocess.getoutput("lscpu | grep 'Model name' | cut -d: -f2").strip(),
"memory_gb": subprocess.getoutput("free -g | awk '/^Mem:/ {print $2}'"),
"disk_gb": subprocess.getoutput("df -h / | awk 'NR==2 {print $2}'")
},
"network": {
"interfaces": [],
"routing_tables": []
}
}
# 网络接口信息
interfaces = subprocess.getoutput("ip -o link show | awk -F': ' '{print $2}'").split('\n')
for iface in interfaces:
if iface and iface != "lo":
addr_info = subprocess.getoutput(f"ip addr show {iface}")
info["network"]["interfaces"].append({
"name": iface,
"info": addr_info
})
return info
def get_network_config(self):
"""获取网络配置"""
config = {
"wireguard": {},
"routing": {},
"firewall": {}
}
# WireGuard配置
wg_configs = ["/etc/wireguard/wg0.conf", "/etc/wireguard2/wg1.conf"]
for conf_file in wg_configs:
if os.path.exists(conf_file):
with open(conf_file, "r") as f:
config["wireguard"][os.path.basename(conf_file)] = f.read()
# FRR配置
frr_files = ["/etc/frr/frr.conf", "/etc/frr/daemons"]
for conf_file in frr_files:
if os.path.exists(conf_file):
with open(conf_file, "r") as f:
config["routing"][os.path.basename(conf_file)] = f.read()
# iptables配置
iptables_files = ["/etc/iptables/rules.v4", "/etc/iptables/rules.v6"]
for conf_file in iptables_files:
if os.path.exists(conf_file):
with open(conf_file, "r") as f:
config["firewall"][os.path.basename(conf_file)] = f.read()
return config
def get_service_config(self):
"""获取服务配置"""
services = {
"systemd_services": [],
"cron_jobs": [],
"custom_scripts": []
}
# Systemd服务
tunnel_services = [
"frr.service",
"[email protected]",
"[email protected]",
"health-checker.service",
"xray.service",
"daily-inspection.service"
]
for service in tunnel_services:
service_file = f"/etc/systemd/system/{service}"
if os.path.exists(service_file):
with open(service_file, "r") as f:
services["systemd_services"].append({
"name": service,
"content": f.read()
})
# Cron作业
cron_file = "/etc/cron.d/tunnel-backup"
if os.path.exists(cron_file):
with open(cron_file, "r") as f:
services["cron_jobs"] = f.read().split('\n')
# 自定义脚本
scripts_dir = "/usr/local/bin"
tunnel_scripts = ["tunnel_switch.py", "health_checker.py", "daily_inspection.py"]
for script in tunnel_scripts:
script_file = os.path.join(scripts_dir, script)
if os.path.exists(script_file):
with open(script_file, "r") as f:
# 只取前100行
content = '\n'.join(f.readlines()[:100])
services["custom_scripts"].append({
"name": script,
"content": content + "\n... (truncated)"
})
return services
def get_monitoring_config(self):
"""获取监控配置"""
monitoring = {
"prometheus": {},
"grafana": {},
"alerting": {}
}
# Prometheus配置
prometheus_files = ["/etc/prometheus/prometheus.yml", "/etc/prometheus/alerts.yml"]
for conf_file in prometheus_files:
if os.path.exists(conf_file):
with open(conf_file, "r") as f:
monitoring["prometheus"][os.path.basename(conf_file)] = f.read()
return monitoring
def get_backup_config(self):
"""获取备份配置"""
backup = {
"scripts": [],
"directories": []
}
# 备份脚本
backup_scripts = ["/usr/local/bin/backup_tunnel_config.sh", "/usr/local/bin/rotate_backups.sh"]
for script in backup_scripts:
if os.path.exists(script):
with open(script, "r") as f:
backup["scripts"].append({
"name": os.path.basename(script),
"content": f.read()
})
# 备份目录
backup_dir = "/backup/tunnel-config"
if os.path.exists(backup_dir):
for root, dirs, files in os.walk(backup_dir):
for file in files:
if file.endswith(".tar.gz"):
file_path = os.path.join(root, file)
backup["directories"].append({
"path": file_path,
"size_mb": os.path.getsize(file_path) / (1024*1024),
"modified": datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
})
return backup
def generate_markdown_docs(self, system_docs):
"""生成Markdown文档"""
print("生成Markdown文档...")
# 1. 架构文档
arch_doc = self.generate_architecture_doc(system_docs)
arch_file = os.path.join(self.docs_dir, "architecture", "overview.md")
with open(arch_file, "w") as f:
f.write(arch_doc)
# 2. 运维手册
ops_doc = self.generate_operations_doc(system_docs)
ops_file = os.path.join(self.docs_dir, "operations", "manual.md")
with open(ops_file, "w") as f:
f.write(ops_doc)
# 3. 故障排查指南
troubleshooting_doc = self.generate_troubleshooting_doc()
troubleshooting_file = os.path.join(self.docs_dir, "troubleshooting", "guide.md")
with open(troubleshooting_file, "w") as f:
f.write(troubleshooting_doc)
# 4. 培训材料
training_doc = self.generate_training_materials()
training_file = os.path.join(self.docs_dir, "training", "materials.md")
with open(training_file, "w") as f:
f.write(training_doc)
# 5. 生成主README
readme_doc = self.generate_readme()
readme_file = os.path.join(self.docs_dir, "README.md")
with open(readme_file, "w") as f:
f.write(readme_doc)
def generate_architecture_doc(self, system_docs):
"""生成架构文档"""
doc = f"""# 跨境SD-WAN架构文档
> 文档生成时间: {self.timestamp}
> 主机: {system_docs['system_info']['hostname']}
## 1. 系统架构概览
### 1.1 物理拓扑
杭州侧 美国侧
────────────────────────────────────────────────────
杭州业务VPS (10.10.0.0/16)
|
|
HZ1 (10.10.1.1) ── FRR OSPF Area 0
| ├── WireGuard Tunnel1 (10.100.1.0/30) ────── LA1 (10.8.1.0/24)
| └── WireGuard Tunnel2 (10.100.2.0/30) ────── LA2 (10.8.2.0/24)
|
杭州内网OSPF Client
### 1.2 逻辑架构
#### 数据平面
- **传输层**: VLESS+REALITY over TCP
- **隧道层**: WireGuard over UDP
- **路由层**: OSPF动态路由
#### 控制平面
- **健康检查**: 定期评估隧道质量
- **路由决策**: 基于健康评分调整OSPF cost
- **故障切换**: 自动路径切换
#### 管理平面
- **监控**: Prometheus + Grafana
- **日志**: ELK Stack
- **备份**: 自动化配置备份
## 2. 网络配置
### 2.1 IP地址规划
| 网络段 | 用途 | 备注 |
|--------|------|------|
| 10.10.0.0/16 | 杭州内网 | 业务VPS网段 |
| 10.100.1.0/30 | HZ1 ↔ LA1隧道 | 点对点链路 |
| 10.100.2.0/30 | HZ1 ↔ LA2隧道 | 点对点链路 |
| 10.8.1.0/24 | LA1侧网络 | 美国业务网络A |
| 10.8.2.0/24 | LA2侧网络 | 美国业务网络B |
### 2.2 路由策略
- **主路径**: LA1 (初始cost=10)
- **备用路径**: LA2 (初始cost=50)
- **切换条件**: 基于健康评分动态调整cost
## 3. 系统组件
### 3.1 核心组件
| 组件 | 版本 | 功能 |
|------|------|------|
| WireGuard | {subprocess.getoutput('wg --version')} | 加密隧道 |
| FRR | {subprocess.getoutput('vtysh --version')} | 动态路由 |
| Xray | {subprocess.getoutput('/usr/local/xray/xray --version')} | 代理传输 |
### 3.2 辅助组件
| 组件 | 功能 |
|------|------|
| Prometheus | 监控指标收集 |
| Grafana | 监控可视化 |
| ELK Stack | 日志分析 |
| 自定义健康检查 | 链路质量评估 |
## 4. 性能指标
### 4.1 设计目标
| 指标 | 目标值 |
|------|--------|
| 故障切换时间 | ≤10秒 |
| 路由收敛时间 | ≤5秒 |
| 系统可用性 | ≥99.9% |
| 隧道延迟 | ≤200ms |
| 隧道丢包率 | ≤5% |
## 5. 安全设计
### 5.1 加密方案
- 传输加密: TLS 1.3 (REALITY)
- 隧道加密: WireGuard (ChaCha20-Poly1305)
- 身份验证: 公钥加密
### 5.2 访问控制
- 防火墙规则: iptables
- 服务白名单: 仅开放必要端口
- 日志审计: 所有操作记录
---
*文档自动生成,请勿手动修改*
"""
return doc
def generate_operations_doc(self, system_docs):
"""生成运维手册"""
doc = f"""# 跨境SD-WAN运维手册
> 文档生成时间: {self.timestamp}
## 1. 日常运维
### 1.1 监控检查
#### 1.1.1 Grafana仪表板
- 访问地址: http://{system_docs['system_info']['hostname']}:3000
- 主要仪表板:
- 隧道健康状态
- 系统资源使用
- 网络流量监控
- 告警事件统计
#### 1.1.2 命令行监控
```bash
# 查看隧道状态
wg show
# 查看路由状态
vtysh -c "show ip ospf neighbor"
vtysh -c "show ip route ospf"
# 查看健康检查状态
systemctl status health-checker
journalctl -u health-checker -f
# 查看系统状态
/usr/local/bin/daily_inspection.py
1.2 备份管理
1.2.1 备份位置
- 每日备份:
/backup/tunnel-config/daily/ - 每周备份:
/backup/tunnel-config/weekly/ - 每月备份:
/backup/tunnel-config/monthly/
1.2.2 备份验证
# 验证最新备份
/usr/local/bin/backup_recovery_test.py --test integrity
# 测试备份恢复
/usr/local/bin/backup_recovery_test.py --test recovery
1.2.3 手动备份
/usr/local/bin/backup_tunnel_config.sh
1.3 日志查看
1.3.1 实时日志
# WireGuard日志
journalctl -u wg-quick@wg0 -f
journalctl -u wg-quick@wg1 -f
# FRR日志
journalctl -u frr -f
# 健康检查日志
tail -f /var/log/health_checker.log
# 隧道切换日志
tail -f /var/log/tunnel_switch.log
1.3.2 日志分析
- Kibana地址: http://{system_docs[‘system_info’][‘hostname’]}:5601
- 主要索引:
- wireguard-logs-*
- frr-logs-*
- health-check-logs-*
2. 配置管理
2.1 配置文件位置
| 配置文件 | 路径 | 描述 |
|---|---|---|
| WireGuard (LA1) | /etc/wireguard/wg0.conf |
LA1隧道配置 |
| WireGuard (LA2) | /etc/wireguard2/wg1.conf |
LA2隧道配置 |
| FRR | /etc/frr/frr.conf |
路由配置 |
| Xray | /usr/local/xray/config_*.json |
代理配置 |
| 健康检查 | /usr/local/bin/health_checker.py |
健康检查脚本 |
| 系统服务 | /etc/systemd/system/*.service |
Systemd服务文件 |
2.2 配置变更流程
-
备份当前配置
/usr/local/bin/backup_tunnel_config.sh -
修改配置文件
# 使用版本控制或编辑工具 cp /path/to/new/config /etc/wireguard/wg0.conf -
验证配置语法
# WireGuard配置验证 wg-quick strip /etc/wireguard/wg0.conf # FRR配置验证 vtysh -C -f /etc/frr/frr.conf -
应用配置
# 重启服务 systemctl restart wg-quick@wg0 systemctl restart frr -
验证变更
# 检查服务状态 systemctl status wg-quick@wg0 systemctl status frr # 验证功能 /usr/local/bin/tunnel_switch.py status
3. 故障处理
3.1 常见故障处理
3.1.1 隧道中断
症状:
- 隧道健康评分为0
- 监控告警触发
- 业务访问失败
处理步骤:
- 检查隧道状态:
wg show - 检查代理服务:
systemctl status xray - 检查网络连通性:
ping -I wg0 10.100.1.2 - 查看日志:
journalctl -u wg-quick@wg0 --since "10 minutes ago" - 手动切换到备用隧道:
/usr/local/bin/tunnel_switch.py switch la2
3.1.2 路由异常
症状:
- OSPF邻居状态异常
- 路由表缺失
- 流量路径错误
处理步骤:
- 检查FRR状态:
systemctl status frr - 查看OSPF邻居:
vtysh -c "show ip ospf neighbor" - 查看路由表:
vtysh -c "show ip route" - 重启FRR:
systemctl restart frr - 检查防火墙规则:
iptables -L -n
3.1.3 健康检查异常
症状:
- 健康检查服务停止
- 评分不再更新
- 自动切换失效
处理步骤:
- 检查服务状态:
systemctl status health-checker - 查看日志:
journalctl -u health-checker - 手动运行检查:
/usr/local/bin/health_checker.py --once - 检查依赖服务:
systemctl status prometheus node_exporter
3.2 紧急恢复
3.2.1 回滚到单隧道模式
/usr/local/bin/rollback_to_single_tunnel.sh
3.2.2 从备份恢复
# 1. 停止所有服务
systemctl stop wg-quick@wg0 wg-quick@wg1 frr xray
# 2. 恢复最新备份
BACKUP_FILE=$(ls -t /backup/tunnel-config/daily/config_*.tar.gz | head -1)
tar -xzf $BACKUP_FILE -C /
# 3. 重启服务
systemctl start frr wg-quick@wg0 wg-quick@wg1 xray
# 4. 验证恢复
/usr/local/bin/tunnel_switch.py status
4. 性能优化
4.1 监控指标调优
4.1.1 健康检查参数
# 位置: /usr/local/bin/health_checker.py
检查间隔: 30秒
评分权重:
延迟: 30%
抖动: 20%
丢包: 30%
TCP连接: 10%
HTTP响应: 10%
切换阈值:
优质: ≥80分 (cost=10)
可用: ≥60分 (cost=50)
劣质: <60分 (cost≥100)
故障: <30分 (cost=65535)
4.1.2 OSPF参数调优
# 进入FRR配置
vtysh
configure terminal
# 调整OSPF定时器
router ospf
timers throttle spf 10 100 5000
timers throttle lsa all 10 100 5000
4.2 系统参数优化
4.2.1 内核参数
# /etc/sysctl.d/60-tunnel-optimization.conf
net.core.rmem_max = 134217728
net.core.wmem_max = 134217728
net.ipv4.tcp_rmem = 4096 87380 134217728
net.ipv4.tcp_wmem = 4096 65536 134217728
net.ipv4.tcp_congestion_control = bbr
4.2.2 WireGuard优化
# /etc/wireguard/wg0.conf
[Interface]
# 调整MTU
MTU = 1380
# 启用多队列(如果支持)
Queue = 2
5. 扩展与升级
5.1 增加新隧道
5.1.1 准备新节点
- 部署新的美国VPS
- 配置网络和防火墙
- 安装必要软件
5.1.2 配置新隧道
- 在HZ1上生成新的WireGuard密钥对
- 配置新的隧道接口
- 更新健康检查配置
- 更新OSPF配置
5.1.3 验证新隧道
- 测试隧道连通性
- 验证路由学习
- 测试故障切换
5.2 系统升级
5.2.1 软件升级
# 系统更新
apt update && apt upgrade -y
# WireGuard升级
apt install --only-upgrade wireguard wireguard-tools
# FRR升级
apt install --only-upgrade frr frr-pythontools
5.2.2 配置迁移
- 备份当前配置
- 比较新旧配置差异
- 逐步应用新配置
- 验证功能正常
6. 联系信息
6.1 支持团队
| 角色 | 联系方式 | 值班时间 |
|---|---|---|
| 一线支持 | +86-138-XXXX-XXXX | 7x24 |
| 二线支持 | +86-139-XXXX-XXXX | 工作时间 |
| 架构师 | +86-136-XXXX-XXXX | 紧急情况 |
6.2 告警通知
- 邮件: [email protected]
- 钉钉: 网络运维群
- SMS: 关键告警短信通知
6.3 文档更新
- 文档仓库: https://github.com/example/tunnel-docs
- 更新流程: PR审核后合并
- 自动发布: 每次提交后自动生成
本手册为自动生成,定期更新,请确保使用最新版本
“”"
return doc
def generate_troubleshooting_doc(self):
"""生成故障排查指南"""
doc = """# 故障排查指南
故障排查流程图
常见故障场景及解决方案
场景1: 隧道完全中断
症状
- 隧道健康评分为0
wg show显示隧道断开- 业务完全无法访问
排查步骤
-
检查WireGuard状态
# 检查接口状态 ip link show wg0 # 检查WireGuard状态 wg show # 检查服务状态 systemctl status wg-quick@wg0 -
检查代理服务
# 检查Xray服务 systemctl status xray # 检查端口监听 netstat -tlnp | grep 10001 # 检查进程状态 ps aux | grep xray -
检查网络连通性
# 测试本地端口 telnet 127.0.0.1 10001 # 测试目标服务器 curl --connect-timeout 5 http://LA1_PUBLIC_IP:443 # 使用tcpdump抓包 tcpdump -i eth0 port 443 -n -
查看相关日志
# WireGuard日志 journalctl -u wg-quick@wg0 --since "1 hour ago" # Xray日志 journalctl -u xray --since "1 hour ago" # 系统日志 tail -f /var/log/syslog | grep -E "(wg|xray|wireguard)" -
紧急处理
# 手动切换到备用隧道 /usr/local/bin/tunnel_switch.py switch la2 # 重启问题服务 systemctl restart wg-quick@wg0 systemctl restart xray
根本原因分析
- 网络问题: 防火墙阻挡、网络中断
- 服务问题: 进程崩溃、配置错误
- 资源问题: 内存不足、连接数满
场景2: 隧道质量下降
症状
- 延迟增加(>200ms)
- 丢包率升高(>10%)
- 健康评分下降(<60分)
排查步骤
-
诊断网络质量
# 测试延迟和丢包 ping -c 100 -I wg0 10.100.1.2 # 使用mtr进行路径诊断 mtr -r -c 100 -I wg0 10.100.1.2 # 测试带宽 iperf3 -c 10.100.1.2 -I wg0 -t 30 -
检查系统资源
# CPU使用率 top -bn1 | grep "Cpu(s)" # 内存使用 free -h # 网络连接数 ss -s | grep "estab" # 磁盘I/O iostat -x 1 5 -
检查流量统计
# 接口流量 ifconfig wg0 # 详细流量统计 nethogs wg0 # 连接追踪 conntrack -L | grep wg0 | wc -l -
优化措施
# 调整内核参数 sysctl -w net.core.rmem_max=134217728 sysctl -w net.core.wmem_max=134217728 # 调整WireGuard参数 wg set wg0 peer <PUBLIC_KEY> persistent-keepalive 10 # 清理连接追踪表 conntrack -F
根本原因分析
- 网络拥塞: 高峰时段带宽不足
- 系统瓶颈: CPU/内存/磁盘限制
- 配置问题: 参数未优化
- 应用问题: 特定应用流量异常
场景3: 路由切换频繁
症状
- 路由表频繁变化
- 流量在隧道间震荡
- OSPF cost频繁调整
排查步骤
-
检查路由状态
# 查看路由变化历史 vtysh -c "show log" | grep OSPF # 监控路由表变化 watch -n 1 'vtysh -c "show ip route ospf"' # 检查OSPF邻居状态 vtysh -c "show ip ospf neighbor detail" -
检查健康检查
# 查看健康检查日志 tail -f /var/log/health_checker.log # 手动运行健康检查 /usr/local/bin/health_checker.py --once # 检查监控指标 curl -s http://localhost:8080/api/v1/health/metrics | grep health_score -
调整防震荡参数
# 修改健康检查脚本中的参数 # 位置: /usr/local/bin/health_checker.py # 增加切换迟滞 COST_CHANGE_THRESHOLD = 100 # 原50 # 增加检查间隔 CHECK_INTERVAL = 60 # 原30 # 增加历史记录长度 HISTORY_LENGTH = 20 # 原10 -
调整OSPF参数
vtysh configure terminal # 增加OSPF定时器 router ospf timers throttle spf 30 200 10000 timers throttle lsa all 30 200 10000 # 调整接口cost变化敏感度 interface wg0 ip ospf cost 10 ip ospf dead-interval 40
根本原因分析
- 网络不稳定: 链路质量波动
- 参数过敏感: 切换阈值过低
- 监控噪声: 健康检查误判
- 配置问题: OSPF定时器不合理
场景4: 监控告警失效
症状
- Prometheus指标缺失
- Grafana面板无数据
- 告警未触发或误报
排查步骤
-
检查监控服务
# 检查Prometheus systemctl status prometheus curl http://localhost:9090/-/healthy # 检查Node Exporter systemctl status node_exporter curl http://localhost:9100/metrics | head -5 # 检查健康检查API curl http://localhost:8080/api/v1/health/status -
检查指标收集
# 查看Prometheus目标状态 curl http://localhost:9090/api/v1/targets | jq . # 检查抓取配置 grep -A 5 -B 5 "tunnel" /etc/prometheus/prometheus.yml # 手动测试指标 curl http://localhost:9100/metrics | grep tunnel -
检查告警规则
# 验证告警规则语法 promtool check rules /etc/prometheus/alerts.yml # 查看活跃告警 curl http://localhost:9090/api/v1/alerts | jq . # 测试告警表达式 curl -s "http://localhost:9090/api/v1/query?query=tunnel_health_score<30" -
检查通知渠道
# 检查Alertmanager systemctl status alertmanager # 查看告警历史 grep "alert" /var/log/prometheus/alertmanager.log | tail -20 # 测试告警发送 curl -X POST http://localhost:9093/api/v1/alerts -d '[{ "labels": {"alertname": "TestAlert", "severity": "warning"} }]'
根本原因分析
- 服务故障: 监控组件停止运行
- 配置错误: 指标路径或端口错误
- 网络问题: 防火墙阻挡监控流量
- 资源问题: 监控服务器资源不足
高级诊断工具
1. 网络诊断工具包
# 安装诊断工具
apt install -y mtr iperf3 tcpdump netcat conntrack nethogs iftop
# 综合诊断脚本
/usr/local/bin/network_diagnosis.sh
2. 性能剖析工具
# 系统性能分析
perf record -g -p $(pidof xray) -- sleep 30
perf report
# 内存分析
valgrind --tool=massif /usr/local/xray/xray
3. 日志分析工具
# 实时日志分析
tail -f /var/log/*.log | grep -E "(ERROR|WARN|FAIL)"
# 日志统计
awk '/ERROR/ {count++} END {print count}' /var/log/tunnel_switch.log
# 时间序列分析
rgrep "隧道.*评分" /var/log/health_checker.log | awk '{print $1, $2, $NF}'
紧急恢复清单
1. 一级紧急(完全中断)
- 确认故障范围
- 切换到备用隧道
- 通知相关团队
- 开始根本原因分析
2. 二级紧急(性能下降)
- 监控性能指标
- 优化系统参数
- 调整流量策略
- 计划维护窗口
3. 三级紧急(监控告警)
- 验证告警真实性
- 检查监控系统
- 更新监控配置
- 记录处理过程
联系支持
内部支持
- 网络团队: #network-support
- 运维团队: #ops-support
- 架构团队: #arch-support
外部支持
- 云服务商: 400-XXX-XXXX
- 软件支持: [email protected]
- 安全团队: [email protected]
本指南应定期更新,反映最新的故障处理经验
“”"
return doc
def generate_training_materials(self):
"""生成培训材料"""
doc = """# 跨境SD-WAN培训材料
培训目标
通过本培训,学员将能够:
- 理解跨境SD-WAN架构和工作原理
- 掌握日常运维操作和监控方法
- 独立处理常见故障和问题
- 执行配置变更和系统优化
- 参与应急演练和恢复操作
培训大纲
模块1: 架构理解(2小时)
- 跨境网络挑战与解决方案
- SD-WAN架构设计原理
- 组件功能介绍与交互
- 数据流与控制流分析
模块2: 日常运维(3小时)
- 监控系统使用
- 日志分析与排查
- 备份与恢复操作
- 性能监控与优化
模块3: 故障处理(4小时)
- 故障诊断方法论
- 常见故障场景演练
- 紧急恢复流程
- 根本原因分析
模块4: 高级操作(3小时)
- 配置变更管理
- 系统扩展与升级
- 安全加固操作
- 性能调优实践
实操练习
练习1: 环境熟悉
# 1. 登录系统
ssh admin@hz1
# 2. 查看系统状态
/usr/local/bin/daily_inspection.py
# 3. 访问监控面板
# 打开浏览器访问: http://hz1:3000
# 4. 查看当前配置
cat /etc/wireguard/wg0.conf | head -20
练习2: 日常巡检
# 1. 检查隧道状态
wg show
# 2. 检查路由状态
vtysh -c "show ip ospf neighbor"
vtysh -c "show ip route ospf"
# 3. 检查健康检查
systemctl status health-checker
tail -f /var/log/health_checker.log
# 4. 检查监控指标
curl -s http://localhost:8080/api/v1/health/status | jq .
练习3: 故障模拟
# 1. 模拟隧道中断
systemctl stop wg-quick@wg0
# 2. 观察故障切换
watch -n 1 'ip route show default'
# 3. 查看告警触发
# 检查Prometheus和Alertmanager
# 4. 恢复隧道
systemctl start wg-quick@wg0
练习4: 配置变更
# 1. 备份当前配置
/usr/local/bin/backup_tunnel_config.sh
# 2. 修改WireGuard配置
cp /etc/wireguard/wg0.conf /etc/wireguard/wg0.conf.bak
sed -i 's/PersistentKeepalive = 25/PersistentKeepalive = 10/' /etc/wireguard/wg0.conf
# 3. 验证配置语法
wg-quick strip /etc/wireguard/wg0.conf
# 4. 应用配置
wg-quick down wg0 && wg-quick up wg0
# 5. 验证变更
wg show wg0 | grep keepalive
考核标准
理论考核(40%)
- 架构理解: 10%
- 工作原理: 10%
- 监控体系: 10%
- 安全知识: 10%
实操考核(40%)
- 日常操作: 10%
- 故障处理: 15%
- 配置变更: 10%
- 应急恢复: 5%
综合考核(20%)
- 案例分析: 10%
- 方案设计: 10%
评分标准
- 优秀(≥90分): 独立承担运维工作
- 良好(≥80分): 在指导下完成工作
- 合格(≥70分): 需要持续培训
- 不合格(<70分): 重新培训
培训资源
在线资源
- 架构文档: https://docs.example.com/tunnel/architecture
- 运维手册: https://docs.example.com/tunnel/operations
- 故障指南: https://docs.example.com/tunnel/troubleshooting
- API文档: https://docs.example.com/tunnel/api
本地资源
- 配置参考:
/opt/tunnel-project/docs/ - 脚本工具:
/usr/local/bin/ - 日志文件:
/var/log/tunnel*.log - 备份文件:
/backup/tunnel-config/
培训环境
- 测试环境: test-hz1, test-la1, test-la2
- 模拟工具:
/usr/local/bin/fault_injection_test.py - 练习脚本:
/opt/training/exercises/
持续学习
每周学习
- 周一: 架构复习(30分钟)
- 周三: 案例分析(45分钟)
- 周五: 技术分享(60分钟)
每月演练
- 第一周: 故障处理演练
- 第三周: 应急恢复演练
每季度考核
- 理论考试(1小时)
- 实操考核(2小时)
- 综合评估(1小时)
联系方式
培训负责人
- 姓名: 张工
- 邮箱: [email protected]
- 电话: +86-138-XXXX-XXXX
技术支持
- 团队: 网络运维部
- 频道: #tunnel-training
- 会议: 每周三 14:00-15:00
反馈渠道
- 培训反馈: https://forms.example.com/training-feedback
- 问题报告: https://github.com/example/tunnel-docs/issues
- 建议提交: [email protected]
培训材料版本: 1.0,更新日期: 2024-01-01
“”"
return doc
def generate_readme(self):
"""生成主README文档"""
doc = f"""# 跨境SD-WAN项目文档
项目文档自动生成系统 | 版本: 1.0 | 生成时间:
项目概述
跨境SD-WAN高可用改造项目旨在构建一个稳定、可靠、可扩展的跨境网络架构,通过双隧道冗余、智能路由选择和自动化运维,提升跨境业务的可用性和性能。
文档结构
docs/
├── architecture/ # 架构文档
│ ├── overview.md # 架构概览
│ ├── design.md # 设计文档
│ └── components.md # 组件说明
├── operations/ # 运维文档
│ ├── manual.md # 运维手册
│ ├── monitoring.md # 监控指南
│ └── backup.md # 备份策略
├── api/ # API文档
│ ├── health-api.md # 健康检查API
│ └── metrics-api.md # 监控指标API
├── troubleshooting/ # 故障排查
│ ├── guide.md # 排查指南
│ ├── scenarios.md # 场景分析
│ └── tools.md # 诊断工具
├── training/ # 培训材料
│ ├── materials.md # 培训手册
│ ├── exercises.md # 练习题目
│ └── assessment.md # 考核标准
└── README.md # 本文档
快速开始
1. 环境访问
# 生产环境
ssh admin@hz1-prod
# 测试环境
ssh admin@hz1-test
# 监控系统
# Grafana: http://hz1:3000
# Kibana: http://hz1:5601
# Prometheus: http://hz1:9090
2. 日常操作
# 查看系统状态
/usr/local/bin/daily_inspection.py
# 检查隧道健康
/usr/local/bin/tunnel_switch.py status
# 执行备份
/usr/local/bin/backup_tunnel_config.sh
3. 故障处理
# 查看故障指南
cat /opt/tunnel-project/docs/troubleshooting/guide.md
# 使用诊断工具
/usr/local/bin/network_diagnosis.sh
# 紧急恢复
/usr/local/bin/rollback_to_single_tunnel.sh
系统组件
| 组件 | 版本 | 功能 | 文档 |
|---|---|---|---|
| WireGuard | 1.0.0 | 加密隧道 | 文档 |
| FRR | 8.4 | 动态路由 | 文档 |
| Xray | 1.8.0 | 代理传输 | 文档 |
| Prometheus | 2.45.0 | 监控收集 | 文档 |
| Grafana | 9.5.0 | 监控可视化 | 文档 |
| ELK Stack | 7.17.0 | 日志分析 | 文档 |
监控指标
关键指标
- 隧道健康评分 (0-100): 综合评估隧道质量
- 隧道延迟 (ms): 端到端通信延迟
- 隧道丢包率 (%): 数据包丢失比例
- OSPF Cost值: 路由优先级指标
- 系统资源使用率: CPU、内存、磁盘、网络
告警规则
- 隧道完全中断: 健康评分 < 30
- 隧道质量劣化: 健康评分 < 60
- 高延迟: 延迟 > 200ms
- 高丢包: 丢包率 > 10%
变更管理
变更流程
- 创建变更请求
- 进行影响分析
- 制定回滚方案
- 在测试环境验证
- 审批通过后实施
- 验证变更效果
- 更新相关文档
变更窗口
- 常规变更: 每周四 02:00-04:00
- 紧急变更: 随时,需双人确认
- 重大变更: 需提前一周申请
备份策略
备份类型
- 配置备份: 每日自动备份
- 系统备份: 每周完整备份
- 日志备份: 每月归档备份
保留策略
- 每日备份: 保留30天
- 每周备份: 保留8周
- 每月备份: 保留12个月
恢复测试
# 测试备份恢复
/usr/local/bin/backup_recovery_test.py --test full
安全策略
访问控制
- 最小权限原则
- 多因素认证
- 操作审计日志
- 定期权限审查
网络安全
- 防火墙白名单
- 加密通信
- 入侵检测
- 漏洞扫描
数据安全
- 加密存储
- 定期备份
- 安全删除
- 合规检查
支持与联系
值班安排
- 一线支持: 7x24小时
- 二线支持: 工作日 09:00-18:00
- 三线支持: 紧急情况响应
联系方式
- 值班电话: +86-138-XXXX-XXXX
- 支持邮箱: [email protected]
- 紧急联系人: +86-139-XXXX-XXXX
文档反馈
- 问题报告: https://github.com/example/tunnel-docs/issues
- 改进建议: [email protected]
- 内容更新: 提交Pull Request
版本历史
| 版本 | 日期 | 说明 | 作者 |
|---|---|---|---|
| 1.0.0 | 2024-01-01 | 初始版本 | 自动化系统 |
| 1.0.1 | 2024-01-15 | 更新运维手册 | 张工 |
| 1.1.0 | 2024-02-01 | 增加故障指南 | 李工 |
许可证
本项目文档采用 CC BY-NC-SA 4.0 许可证。
免责声明
本文档仅供参考,实际操作请根据生产环境情况进行调整。对于因使用本文档而产生的任何损失,作者不承担任何责任。
文档自动生成,最后更新: {self.timestamp}
“”"
return doc
def build_html_docs(self):
"""构建HTML文档"""
print("构建HTML文档...")
# 安装mkdocs(如果不存在)
try:
import mkdocs
except ImportError:
subprocess.run("pip install mkdocs mkdocs-material", shell=True)
# 创建mkdocs配置
mkdocs_config = os.path.join(self.project_root, "mkdocs.yml")
with open(mkdocs_config, "w") as f:
f.write("""site_name: 跨境SD-WAN文档
theme:
name: material
features:
- navigation.tabs
- navigation.sections
- toc.integrate
- search.suggest
- search.highlight
palette:
primary: blue
accent: light blue
repo_name: example/tunnel-docs
repo_url: https://github.com/example/tunnel-docs
nav:
- 首页: index.md
- 架构设计:
- 概览: architecture/overview.md
- 设计原理: architecture/design.md
- 组件说明: architecture/components.md
- 运维手册:
- 日常运维: operations/manual.md
- 监控指南: operations/monitoring.md
- 备份策略: operations/backup.md
- 故障排查:
- 排查指南: troubleshooting/guide.md
- 场景分析: troubleshooting/scenarios.md
- 诊断工具: troubleshooting/tools.md
- API文档:
- 健康检查API: api/health-api.md
- 监控指标API: api/metrics-api.md
- 培训材料:
- 培训手册: training/materials.md
- 练习题目: training/exercises.md
- 考核标准: training/assessment.md
markdown_extensions:
-
admonition
-
codehilite
-
footnotes
-
meta
-
toc:
permalink: true -
pymdownx.arithmatex
-
pymdownx.betterem:
smart_enable: all -
pymdownx.caret
-
pymdownx.critic
-
pymdownx.details
-
pymdownx.emoji:
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg -
pymdownx.highlight
-
pymdownx.inlinehilite
-
pymdownx.keys
-
pymdownx.magiclink
-
pymdownx.mark
-
pymdownx.smartsymbols
-
pymdownx.snippets
-
pymdownx.superfences
-
pymdownx.tabbed:
alternate_style: true -
pymdownx.tasklist:
custom_checkbox: true -
pymdownx.tilde
“”")# 创建index.md index_file = os.path.join(self.docs_dir, "index.md") with open(index_file, "w") as f: f.write(self.generate_readme()) # 构建HTML build_cmd = f"cd {self.project_root} && mkdocs build --site-dir {self.build_dir}/html" subprocess.run(build_cmd, shell=True, check=True) print(f"HTML文档构建完成: {self.build_dir}/html")def publish_docs(self):
“”“发布文档”“”
print(“发布文档…”)# 1. 打包文档 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") docs_package = f"/tmp/tunnel-docs-{timestamp}.tar.gz" subprocess.run(f"tar -czf {docs_package} -C {self.build_dir} .", shell=True) # 2. 发布到Web服务器(示例) web_dir = "/var/www/html/tunnel-docs" subprocess.run(f"rsync -avz {self.build_dir}/html/ {web_dir}/", shell=True) # 3. 发布到版本控制(示例) git_cmds = [ f"cd {self.project_root}", "git add docs/ mkdocs.yml", f"git commit -m '更新文档 {timestamp}'", "git push origin main" ] subprocess.run(" && ".join(git_cmds), shell=True) print(f"文档发布完成:") print(f" 本地访问: file://{self.build_dir}/html/index.html") print(f" Web访问: http://{subprocess.getoutput('hostname')}/tunnel-docs/") print(f" 文档包: {docs_package}")def run_pipeline(self):
“”“运行完整文档流水线”“”
print(“启动文档生成流水线”)
print(“=”*60)# 1. 收集系统文档 system_docs = self.gather_system_documentation() # 2. 生成Markdown文档 self.generate_markdown_docs(system_docs) # 3. 构建HTML文档 self.build_html_docs() # 4. 发布文档 self.publish_docs() print("="*60) print("文档生成流水线完成")
def main():
import argparse
parser = argparse.ArgumentParser(description="文档生成流水线")
parser.add_argument("--step", choices=["collect", "generate", "build", "publish", "all"],
default="all", help="执行步骤")
parser.add_argument("--output-dir", help="输出目录")
args = parser.parse_args()
pipeline = DocumentationPipeline()
if args.output_dir:
pipeline.project_root = args.output_dir
pipeline.docs_dir = os.path.join(args.output_dir, "docs")
pipeline.build_dir = os.path.join(args.output_dir, "docs-build")
pipeline.create_directory_structure()
if args.step == "collect":
docs = pipeline.gather_system_documentation()
print(json.dumps(docs, indent=2, ensure_ascii=False))
elif args.step == "generate":
docs = pipeline.gather_system_documentation()
pipeline.generate_markdown_docs(docs)
elif args.step == "build":
pipeline.build_html_docs()
elif args.step == "publish":
pipeline.publish_docs()
else: # all
pipeline.run_pipeline()
if name == “main”:
main()
### 20.5 知识转移与交接检查表
**文档 20.5.1:知识转移检查表**
```markdown
# 跨境SD-WAN项目知识转移检查表
## 项目信息
- 项目名称: 跨境SD-WAN高可用改造项目
- 移交方: 项目实施团队
- 接收方: 运维团队
- 移交日期: _______________
## 知识转移内容
### 1. 架构理解 ✅
- [ ] 理解整体架构设计
- [ ] 掌握各组件功能和作用
- [ ] 理解数据流和控制流
- [ ] 了解关键设计决策
### 2. 系统配置 ✅
- [ ] 掌握配置文件位置和格式
- [ ] 理解配置参数含义
- [ ] 掌握配置变更流程
- [ ] 了解配置验证方法
### 3. 日常运维 ✅
- [ ] 掌握日常巡检流程
- [ ] 熟悉监控系统使用
- [ ] 掌握日志分析方法
- [ ] 了解性能监控指标
### 4. 故障处理 ✅
- [ ] 掌握故障诊断流程
- [ ] 熟悉常见故障场景
- [ ] 掌握紧急恢复操作
- [ ] 了解根本原因分析方法
### 5. 备份恢复 ✅
- [ ] 掌握备份策略
- [ ] 熟悉备份验证方法
- [ ] 掌握恢复操作流程
- [ ] 了解灾难恢复计划
### 6. 变更管理 ✅
- [ ] 掌握变更控制流程
- [ ] 熟悉变更验证方法
- [ ] 掌握回滚操作流程
- [ ] 了解变更窗口安排
### 7. 安全管理 ✅
- [ ] 掌握访问控制策略
- [ ] 熟悉安全监控方法
- [ ] 掌握漏洞管理流程
- [ ] 了解合规要求
### 8. 文档使用 ✅
- [ ] 熟悉文档结构
- [ ] 掌握文档查找方法
- [ ] 了解文档更新流程
- [ ] 熟悉培训材料使用
## 实操考核
### 1. 日常操作考核
- [ ] 完成一次完整的日常巡检
- [ ] 正确使用监控系统查看状态
- [ ] 成功执行一次配置备份
- [ ] 正确分析系统日志
### 2. 故障处理考核
- [ ] 成功诊断并处理模拟故障
- [ ] 正确执行故障切换操作
- [ ] 完成故障报告编写
- [ ] 正确分析根本原因
### 3. 变更管理考核
- [ ] 完成一次配置变更操作
- [ ] 正确验证变更效果
- [ ] 成功执行回滚操作
- [ ] 完成变更文档更新
### 4. 应急恢复考核
- [ ] 成功执行备份恢复
- [ ] 完成系统重建操作
- [ ] 验证恢复后系统功能
- [ ] 完成恢复报告编写
## 文档交接
### 1. 技术文档
- [ ] 架构设计文档
- [ ] 运维手册
- [ ] 故障排查指南
- [ ] API文档
- [ ] 培训材料
### 2. 配置文档
- [ ] 系统配置清单
- [ ] 网络拓扑图
- [ ] 部署架构图
- [ ] 数据流图
### 3. 过程文档
- [ ] 变更记录
- [ ] 故障记录
- [ ] 维护记录
- [ ] 审计日志
### 4. 管理文档
- [ ] 服务级别协议(SLA)
- [ ] 运维流程文档
- [ ] 应急预案
- [ ] 联系人清单
## 工具交接
### 1. 监控工具
- [ ] Prometheus访问权限
- [ ] Grafana访问权限
- [ ] ELK Stack访问权限
- [ ] 告警配置权限
### 2. 管理工具
- [ ] 配置管理工具
- [ ] 备份管理工具
- [ ] 日志分析工具
- [ ] 性能测试工具
### 3. 脚本工具
- [ ] 日常巡检脚本
- [ ] 故障诊断脚本
- [ ] 备份恢复脚本
- [ ] 性能测试脚本
### 4. 访问权限
- [ ] 系统登录权限
- [ ] 数据库访问权限
- [ ] 网络设备权限
- [ ] 云平台权限
## 支持交接
### 1. 支持渠道
- [ ] 一线支持联系方式
- [ ] 二线支持联系方式
- [ ] 紧急联系渠道
- [ ] 供应商联系方式
### 2. 支持流程
- [ ] 事件管理流程
- [ ] 问题管理流程
- [ ] 变更管理流程
- [ ] 知识管理流程
### 3. 支持资源
- [ ] 知识库访问
- [ ] 文档库访问
- [ ] 工具集访问
- [ ] 培训资源访问
### 4. 支持安排
- [ ] 值班安排表
- [ ] 交接班流程
- [ ] 升级流程
- [ ] 报告流程
## 移交确认
### 移交方确认
我已将上述所有内容完整移交给接收方,并确保:
- [ ] 所有文档齐全且最新
- [ ] 所有工具可正常使用
- [ ] 所有权限配置正确
- [ ] 接收方已通过考核
**移交方签字**: _________________
**日期**: _________________
### 接收方确认
我已接收上述所有内容,并确认:
- [ ] 已理解系统架构和原理
- [ ] 已掌握日常运维操作
- [ ] 已通过实操考核
- [ ] 已获得必要权限和工具
**接收方签字**: _________________
**日期**: _________________
### 项目管理方确认
我确认知识转移过程符合项目要求:
- [ ] 转移内容完整
- [ ] 转移过程规范
- [ ] 接收方具备能力
- [ ] 文档资料齐全
**项目管理方签字**: _________________
**日期**: _________________
总结
至此,我们已经完成了跨境SD-WAN高可用改造项目的全部三个阶段详细实施方案:
📋 第一阶段:手动切换双隧道(5天)
- ✅ 建立了双隧道基础架构
- ✅ 实现了策略路由和手动切换
- ✅ 部署了基础监控系统
- ✅ 创建了完整的测试和回滚方案
🚀 第二阶段:智能路由与自动切换(15天)
- ✅ 部署了FRR动态路由系统
- ✅ 实现了智能健康检查系统
- ✅ 构建了自动化故障切换机制
- ✅ 建立了全面的监控告警体系
- ✅ 完成了故障注入和性能测试
🛡️ 第三阶段:运维优化与生产就绪(10天)
- ✅ 建立了日志聚合和分析系统
- ✅ 实现了自动化备份和恢复策略
- ✅ 生成了完整的项目文档体系
- ✅ 创建了知识转移和培训材料
- ✅ 制定了运维流程和应急方案
🎯 核心成果
- 高可用架构:从单点故障到双隧道自动切换
- 智能路由:基于链路质量的动态路径选择
- 全面监控:多层次、全方位的监控体系
- 自动化运维:从巡检到备份的全面自动化
- 完整文档:从架构到运维的完整知识库
- 运维能力:经过培训的自主运维团队
🔧 关键脚本工具
| 工具类别 | 主要脚本 | 功能 |
|---|---|---|
| 隧道管理 | tunnel_switch.py |
隧道切换和状态管理 |
| 健康检查 | health_checker.py |
链路质量评估和路由调整 |
| 日常巡检 | daily_inspection.py |
系统健康检查和报告 |
| 备份恢复 | backup_recovery_test.py |
备份验证和恢复测试 |
| 故障注入 | fault_injection_test.py |
故障模拟和测试 |
| 性能测试 | performance_benchmark.py |
系统性能评估 |
| 文档生成 | documentation_pipeline.py |
自动化文档生成 |
📊 监控体系
- 基础监控:Node Exporter + Prometheus
- 业务监控:自定义健康检查指标
- 日志监控:ELK Stack日志分析
- 告警管理:Prometheus Alertmanager
- 可视化:Grafana仪表板
📁 文档体系
- 架构文档:设计原理、组件说明
- 运维手册:日常操作、监控指南
- 故障指南:排查流程、场景分析
- API文档:健康检查、监控接口
- 培训材料:课程内容、实操练习
- 管理文档:流程规范、应急预案
🚨 应急准备
- 快速恢复:一键回滚脚本
- 备份验证:定期恢复测试
- 故障演练:模拟故障场景
- 应急预案:分级响应流程
- 联系人清单:7x24支持网络
📈 持续改进
- 定期复盘:每月运维回顾会议
- 性能优化:季度性能评估和调优
- 安全加固:定期安全审计和漏洞修复
- 容量规划:基于监控数据的扩容预测
- 技术演进:跟踪新技术并评估引入
🎉 项目成功标准
- ✅ 双隧道稳定运行30天无重大故障
- ✅ 自动故障切换时间≤10秒
- ✅ 系统可用性≥99.9%
- ✅ 运维团队可独立处理90%以上问题
- ✅ 文档完整度100%
- ✅ 培训考核通过率100%
这个实施方案提供了从零到生产就绪的完整路径,每个步骤都包含详细的脚本、配置和验证方法。按照这个计划执行,可以构建一个稳定、可靠、易维护的跨境SD-WAN网络,为业务提供高质量的跨境连接服务。
评论区