Origin
Have you encountered scenarios where you need to manually log into dozens of servers to deploy new versions, executing the same commands repeatedly? Or perhaps you need to regularly collect monitoring data from multiple servers but lack an automated solution? These repetitive operations tasks are not only tedious but also prone to errors. As a Python programmer, I deeply understand the importance of using scripts to automate these tasks. Today, I'd like to share how to elegantly solve these problems using Python's Paramiko module.
Background
Before we begin, let's understand why we chose Paramiko. When it comes to remote server operations, you might think of using shell scripts with ssh commands directly. However, shell scripts often struggle with complex logic and lack good cross-platform support. Python, as an elegant and powerful language, can help us handle various remote operation scenarios elegantly through Paramiko, a professional SSH protocol library.
Getting Started
First, let's start with a simple example. Suppose we want to execute a simple command on a remote server, such as checking disk usage:
import paramiko
import time
def get_ssh_client():
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
return client
def check_disk_usage(hostname, username, password):
client = get_ssh_client()
try:
client.connect(hostname, username=username, password=password)
stdin, stdout, stderr = client.exec_command('df -h')
result = stdout.read().decode()
return result
finally:
client.close()
As you can see, just a few lines of code implement remote command execution functionality. But this is just the beginning, let's dive deeper.
Advanced
In real work, we often need to handle more complex scenarios. For example, we might need to execute commands in batch on multiple servers, or upload files before executing a series of operations. The following example demonstrates how to implement these advanced features:
import paramiko
from concurrent.futures import ThreadPoolExecutor
import os
class RemoteManager:
def __init__(self, hostname, username, password):
self.hostname = hostname
self.username = username
self.password = password
self._client = None
self._sftp = None
def __enter__(self):
self._client = paramiko.SSHClient()
self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._client.connect(self.hostname, username=self.username, password=self.password)
self._sftp = self._client.open_sftp()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._sftp:
self._sftp.close()
if self._client:
self._client.close()
def execute_command(self, command):
stdin, stdout, stderr = self._client.exec_command(command)
return {
'out': stdout.read().decode(),
'err': stderr.read().decode(),
'status': stdout.channel.recv_exit_status()
}
def upload_file(self, local_path, remote_path):
self._sftp.put(local_path, remote_path)
This class encapsulates common remote operation functionality, using context managers to ensure proper resource release. Let's see how to use it:
def deploy_application(servers, app_package):
def deploy_to_server(server):
with RemoteManager(**server) as remote:
# Upload application package
remote.upload_file(app_package, f'/tmp/{os.path.basename(app_package)}')
# Extract
result = remote.execute_command(f'cd /tmp && tar xzf {os.path.basename(app_package)}')
if result['status'] != 0:
return f"Failed to extract on {server['hostname']}: {result['err']}"
# Restart service
result = remote.execute_command('systemctl restart myapp')
if result['status'] != 0:
return f"Failed to restart service on {server['hostname']}: {result['err']}"
return f"Successfully deployed to {server['hostname']}"
# Parallel deployment to multiple servers
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(deploy_to_server, servers)
return list(results)
Practical Application
Now let's look at a real application scenario: suppose we need to develop an automated operations tool to monitor system status across multiple servers and automatically handle abnormal situations.
import json
import time
from datetime import datetime
class ServerMonitor:
def __init__(self, servers):
self.servers = servers
self.alert_thresholds = {
'cpu_usage': 80, # CPU usage threshold
'memory_usage': 85, # Memory usage threshold
'disk_usage': 90 # Disk usage threshold
}
def get_system_metrics(self, remote):
commands = {
'cpu': "top -bn1 | grep 'Cpu(s)' | awk '{print $2}'",
'memory': "free | grep Mem | awk '{print $3/$2 * 100.0}'",
'disk': "df -h / | tail -1 | awk '{print $5}' | cut -d'%' -f1"
}
metrics = {}
for metric, command in commands.items():
result = remote.execute_command(command)
if result['status'] == 0:
metrics[metric] = float(result['out'].strip())
return metrics
def handle_alert(self, server, metric, value):
# Add alert logic here, such as sending emails or SMS
alert_message = {
'timestamp': datetime.now().isoformat(),
'server': server['hostname'],
'metric': metric,
'value': value,
'threshold': self.alert_thresholds[f'{metric}_usage']
}
print(f"ALERT: {json.dumps(alert_message, indent=2)}")
# Execute automatic repair operations
if metric == 'disk':
self.clean_old_logs(server)
def clean_old_logs(self, server):
with RemoteManager(**server) as remote:
# Clean log files older than 30 days
command = 'find /var/log -type f -mtime +30 -delete'
result = remote.execute_command(command)
if result['status'] == 0:
print(f"Successfully cleaned old logs on {server['hostname']}")
def monitor(self, interval=300): # Check every 5 minutes by default
while True:
for server in self.servers:
try:
with RemoteManager(**server) as remote:
metrics = self.get_system_metrics(remote)
for metric, value in metrics.items():
threshold = self.alert_thresholds[f'{metric}_usage']
if value > threshold:
self.handle_alert(server, metric, value)
print(f"Server {server['hostname']} metrics: {json.dumps(metrics, indent=2)}")
except Exception as e:
print(f"Error monitoring {server['hostname']}: {str(e)}")
time.sleep(interval)
Using this monitoring system is very simple:
```python servers = [ { 'hostname': 'web-server-1', 'username': 'admin', 'password': 'secure_password' }, { 'hostname':