运维自动化脚本分享:提升效率的Shell与Python实战

在IT运维的日常工作中,重复性劳动往往占据了大量时间。从服务器巡检、日志清理到批量部署,手动操作不仅效率低下,而且容易因人为疏忽引发线上故障。自动化是运维走向DevOps的核心路径,而编写脚本则是实现自动化最直接、最有效的手段。

本文将分享四个在日常运维和渠道管理中极为实用的自动化脚本,涵盖Shell与Python两种主流语言,帮助大家从繁琐的机械操作中解放出来。


一、 Shell脚本:系统健康度一键巡检

对于渠道侧的大量边缘节点或核心业务服务器,定期巡检是保障稳定性的基础。以下Shell脚本可以快速抓取CPU、内存、磁盘及异常进程等关键指标,并输出标准化报告,非常适合配合Cron定时任务每日执行。


#!/bin/bash
# 系统健康巡检脚本

# 定义阈值
CPU_THRESHOLD=80
MEM_THRESHOLD=80
DISK_THRESHOLD=90

echo "===== 服务器健康巡检报告 ====="
echo "主机名: $(hostname)"
echo "IP地址: $(hostname -I | awk '{print $1}')"
echo "巡检时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo "-----------------------------------"

# 1. CPU负载检查
cpu_load=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}')
cpu_int=${cpu_load%.*} # 取整
if [ "$cpu_int" -gt "$CPU_THRESHOLD" ]; then
    echo "[警告] CPU使用率: ${cpu_load}% (超过阈值${CPU_THRESHOLD}%)"
else
    echo "[正常] CPU使用率: ${cpu_load}%"
fi

# 2. 内存使用检查
mem_usage=$(free -m | awk 'NR==2{printf "%.1f", $3/$2*100}')
mem_int=${mem_usage%.*}
if [ "$mem_int" -gt "$MEM_THRESHOLD" ]; then
    echo "[警告] 内存使用率: ${mem_usage}% (超过阈值${MEM_THRESHOLD}%)"
else
    echo "[正常] 内存使用率: ${mem_usage}%"
fi

# 3. 磁盘空间检查
echo "[磁盘空间检查]"
df -h | grep -vE '^Filesystem|tmpfs|cdrom' | while read line; do
    partition=$(echo $line | awk '{print $1}')
    usage=$(echo $line | awk '{print $5}' | sed 's/%//g')
    mount=$(echo $line | awk '{print $6}')
    if [ "$usage" -gt "$DISK_THRESHOLD" ]; then
        echo "  [警告] 挂载点 $mount 使用率: ${usage}%"
    else
        echo "  [正常] 挂载点 $mount 使用率: ${usage}%"
    fi
done

echo "===== 巡检结束 ====="

实践建议:可以将此脚本的输出重定向到文件,并通过邮件或企业微信机器人发送给运维团队,实现每日自动报平安。


二、 Shell脚本:过期日志安全清理

日志文件是排查问题的金矿,但无限制增长的日志会撑爆磁盘。传统的rm -rf过于粗暴,以下脚本采用“保留近期,清理远期”的原则,按天数和文件大小双重校验,确保安全。


#!/bin/bash
# 日志安全清理脚本

LOG_DIR="/var/log/app"
DAYS_TO_KEEP=7
SIZE_THRESHOLD="500M" # 超过此大小的活跃日志执行清空而非删除

find $LOG_DIR -name "*.log" -type f -mtime +$DAYS_TO_KEEP -exec rm -f {} \;
echo "已清理 ${DAYS_TO_KEEP} 天前的过期日志"

# 针对当前活跃的大日志文件,避免直接删除导致句柄未释放
find $LOG_DIR -name "*.log" -type f -size +$SIZE_THRESHOLD -exec truncate -s 0 {} \;
echo "已清空超过 ${SIZE_THRESHOLD} 的活跃大日志文件"

实践建议:直接删除正在被程序写入的日志文件,可能导致磁盘空间不释放(进程仍持有文件句柄)。使用truncate -s 0清空文件内容而非删除文件,是更优雅的生产环境实践。


三、 Python脚本:多线程批量主机存活探测

在渠道网点管理或大规模集群中,经常需要验证成百上千个IP的网络连通性。Shell的for循环加ping串行执行极慢,而Python借助多线程可实现秒级探测。


import subprocess
import ipaddress
from concurrent.futures import ThreadPoolExecutor, as_completed

def ping_host(ip):
    """探测单个IP的存活状态"""
    # Linux下使用 ping -c 1 -W 1
    command = ['ping', '-c', '1', '-W', '1', str(ip)]
    try:
        result = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        if result.returncode == 0:
            return str(ip), True
    except Exception:
        pass
    return str(ip), False

def batch_ping(network_cidr, max_workers=100):
    """批量探测网段内存活主机"""
    alive_hosts = []
    ips = list(ipaddress.ip_network(network_cidr).hosts())
    
    print(f"开始探测网段 {network_cidr},共 {len(ips)} 个IP...")
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(ping_host, ip): ip for ip in ips}
        for future in as_completed(futures):
            ip, is_alive = future.result()
            if is_alive:
                alive_hosts.append(ip)
                
    alive_hosts.sort(key=lambda x: ipaddress.ip_address(x))
    print(f"探测完成,存活主机数: {len(alive_hosts)}")
    for host in alive_hosts:
        print(f"  - {host} (Online)")

if __name__ == "__main__":
    # 示例:探测 192.168.1.0/24 网段
    batch_ping("192.168.1.0/24")

实践建议:通过max_workers控制并发度,在公网探测时建议设置在50-100之间,以免触发中间网络设备的限速或防火墙封禁。


四、 Python脚本:API服务可用性监控

现代微服务架构下,单纯依靠端口存活已无法判断服务是否真正可用。以下Python脚本通过请求业务API,校验HTTP状态码与返回体关键字,实现深度监控,并可无缝对接告警网关。


import requests
import time
import json

# 监控配置
API_ENDPOINTS = [
    {"name": "用户登录接口", "url": "https://api.example.com/login", "method": "GET", "expect_code": 200, "expect_keyword": "token"},
    {"name": "订单查询接口", "url": "https://api.example.com/orders", "method": "GET", "expect_code": 200, "expect_keyword": "data"}
]

WEBHOOK_URL = "https://your-webhook-url/notify" # 告警回调地址

def check_api(api):
    """执行API健康检查"""
    try:
        start_time = time.time()
        if api["method"] == "GET":
            response = requests.get(api["url"], timeout=5)
        else:
            response = requests.post(api["url"], timeout=5)
        
        latency = round((time.time() - start_time) * 1000, 2)
        
        # 校验状态码与关键字
        code_match = (response.status_code == api["expect_code"])
        keyword_match = (api["expect_keyword"] in response.text)
        
        if code_match and keyword_match:
            print(f"[正常] {api['name']} | 状态码: {response.status_code} | 延迟: {latency}ms")
            return True
        else:
            error_msg = f"[故障] {api['name']} | 状态码: {response.status_code} | 延迟: {latency}ms | 关键字匹配: {keyword_match}"
            print(error_msg)
            send_alert(error_msg)
            return False
            
    except requests.RequestException as e:
        error_msg = f"[超时/异常] {api['name']} | 错误: {str(e)}"
        print(error_msg)
        send_alert(error_msg)
        return False

def send_alert(message):
    """发送告警(示例:发送至Webhook)"""
    payload = {"content": f"⚠️ API监控告警:\n{message}"}
    try:
        # requests.post(WEBHOOK_URL, json=payload, timeout=3)
        pass # 实际生产中取消