日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
純干貨!Python在運(yùn)維中的應(yīng)用:批量ssh/sftp

環(huán)境:

  • 生產(chǎn):4000+物理服務(wù)器,近 3000 臺(tái)虛擬機(jī)。
  • 開(kāi)發(fā)環(huán)境:python3.6、redhat7.9,除了paramiko為第三方模塊需要自己安裝,其他的直接import即可。

主要應(yīng)用方向:

  1. 配置變更:例如服務(wù)器上線時(shí)需要批量校正系統(tǒng)分區(qū)容量、及掛載數(shù)據(jù)盤。
  2. 配置信息查詢過(guò)濾:例如過(guò)濾防火墻規(guī)則、過(guò)濾網(wǎng)卡配置。
  3. 存活檢測(cè):設(shè)置定時(shí)任務(wù),定時(shí)輪詢服務(wù)器的 ssh 狀態(tài)是否正常。
  4. 文件傳輸:多個(gè) ip 同時(shí)傳輸目錄/文件。

基本原則:

批量執(zhí)行操作是一把雙刃劍。批量執(zhí)行操作可以提升工作效率,但是隨之而來(lái)的風(fēng)險(xiǎn)不可忽略。

站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到翠屏網(wǎng)站設(shè)計(jì)與翠屏網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:做網(wǎng)站、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、國(guó)際域名空間、網(wǎng)站空間、企業(yè)郵箱。業(yè)務(wù)覆蓋翠屏地區(qū)。

風(fēng)險(xiǎn)案例如下:

掛載很多數(shù)據(jù)盤,通常先格式化硬盤,再掛載數(shù)據(jù)盤,最后再寫(xiě)入將開(kāi)機(jī)掛載信息寫(xiě)入/etc/fstab文件。在批量lsblk檢查硬盤信息的時(shí)候發(fā)現(xiàn)有的系統(tǒng)盤在/sda有的在/sdm,如果不事先檢查機(jī)器相關(guān)配置是否一致直接按照工作經(jīng)驗(yàn)去執(zhí)行批量操作,會(huì)很容易造成個(gè)人難以承受的災(zāi)難。

在執(zhí)行批量操作時(shí)按照慣例:格式化硬盤->掛載->開(kāi)機(jī)掛載的順序去執(zhí)行,假設(shè)有的機(jī)器因?yàn)槟承┕收蠈?dǎo)致格式化硬盤沒(méi)法正確執(zhí)行。在處理這類問(wèn)題的時(shí)候通常會(huì)先提取出失敗的ip,并再按照慣例執(zhí)行操作。運(yùn)維人員會(huì)很容易忽略開(kāi)機(jī)掛載的信息已經(jīng)寫(xiě)過(guò)了,導(dǎo)致復(fù)寫(xiě)(這都是血和淚的教訓(xùn))。

所以,為了避免故障,提升工作效率,我認(rèn)為應(yīng)當(dāng)建立團(tuán)隊(duì)在工作上的共識(shí),應(yīng)當(dāng)遵守以下原則:

  1. 批量操作前應(yīng)當(dāng)準(zhǔn)備回退方案。
  2. 批量操作前作前先確定檢查目標(biāo)服務(wù)器相關(guān)的配置的一致性。
  3. 批量操作時(shí)應(yīng)當(dāng)把重復(fù)執(zhí)行會(huì)影響系統(tǒng)的操作和不影響的分開(kāi)執(zhí)行。
  4. 批量操作后應(yīng)當(dāng)再次驗(yàn)證操作結(jié)果是否符合預(yù)期。

當(dāng)然,代碼的規(guī)范也應(yīng)當(dāng)重視起來(lái),不僅是為了便于審計(jì),同時(shí)也需要便于溯源。我認(rèn)為應(yīng)當(dāng)注意以下幾點(diǎn):

  1. 關(guān)鍵方法一定要記錄傳入的參數(shù)以及執(zhí)行后的結(jié)果。
  2. 為了避免方法返回值不符合預(yù)期,該拋異常的地方一定要拋。
  3. 優(yōu)化代碼,刪去不必要的邏輯分支和盡量不要寫(xiě)重復(fù)的代碼,使代碼看起來(lái)整潔。
  4. 程序的執(zhí)行情況、結(jié)果一定要保留日志。

技術(shù)難點(diǎn)

1、ssh no existing session,sftp超時(shí)時(shí)間設(shè)置:

在代碼無(wú)錯(cuò)的情況下大量ip出現(xiàn)No existing session,排查后定位在代碼的寫(xiě)法上,下面是一個(gè)正確的示例。由于最開(kāi)始沒(méi)考慮到ssh連接的幾種情況導(dǎo)致了重寫(xiě)好幾遍。另外sftp的實(shí)例貌似不能直接設(shè)置連接超時(shí)時(shí)間,所以我采用了先建立ssh連接再打開(kāi)sftp的方法。

import paramiko

username = 'root'
port = 22
pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') # 導(dǎo)入公鑰
timeout=10


def ssh_client( ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient() # 實(shí)例化ssh
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 配置ssh互信
if auth == 'id_rsa': # 公鑰認(rèn)證
client.connect(ip, port, username, pkey=pkey, banner_timeout=60, timeout=timeout)
elif auth == 'noAuth': # 用戶名密碼認(rèn)證
if user is not None and passwd is not None:
client.connect(ip, port, user, passwd, banner_timeout=60, timeout=timeout)
else:
raise ValueError('傳入的用戶名密碼不能為空')
else:
raise NameError('不存在此%s認(rèn)證方式' % auth)
return client


def sftp_client(ip, user=None, passwd=None, auth='id_rsa'):
ssh = ssh_client(ip, user, passwd, auth)
sftp = ssh.open_sftp()
return sftp

2、sftp中的get()和put()方法僅能傳文件,不支持直接傳目錄:

不能直接傳目錄,那換個(gè)思路,遍歷路徑中的目錄和文件,先創(chuàng)建目錄再傳文件就能達(dá)到一樣的效果了。在paramiko的sftp中sftp.listdir_attr()方法可以獲取遠(yuǎn)程路徑中的文件、目錄信息。那么我們可以寫(xiě)一個(gè)遞歸來(lái)遍歷遠(yuǎn)程路徑中的所有文件和目錄(傳入一個(gè)列表是為了接收遞歸返回的值)。

def check_remote_folders(sftp, remote_path, remote_folders: list):
# 遞歸遍歷遠(yuǎn)程路徑中的目錄,并添加到remote_folders中
for f in sftp.listdir_attr(remote_path):
# 檢查路徑狀態(tài)是否為目錄
if stat.S_ISDIR(f.st_mode):
# 遞歸調(diào)用自己
check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
# 添加遍歷到的目錄信息到列表中
remote_folders.append(remote_path + '/' + f.filename)

python自帶的os模塊中的os.walk()方法可以遍歷到本地路徑中的目錄和文件。

  local_files_path = []
local_directory = []
for root, dirs, files in os.walk(local_path):
local_directory.append(root)
for file in files:if root[-1] != '/':
local_files_path.append(root + '/' + file)
elif root[-1] == '/':
local_files_path.append(root + file)

3、多線程多個(gè)ip使用sftp.get()方法時(shí)無(wú)法并發(fā)。

改成多進(jìn)程即可。

 def batch_sftp_get(ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
pool = multiprocessing.Pool(5)for i in ip:
pool.apply_async(sftp_get, (i, remote_path, local_path, user, passwd, auth,))
pool.close()
pool.join()

4、多個(gè)ip需要執(zhí)行相同命令或不同的命令。

由于是日常使用的場(chǎng)景不會(huì)很復(fù)雜,所以借鑒了ansible的playbook,讀取提前準(zhǔn)備好的配置文件即可,然后再整合到之前定義的ssh函數(shù)中。

# 配置文件大概是這樣
192.168.0.100:df -Th | grep xfs; lsblk | grep disk | wc -l
192.168.0.101:ip a | grep team | wc -l
192.168.0.102:route -n
...
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, json


def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
pool = ThreadPoolExecutor(self.workers)
task = []
if cmd_type == 'one':
task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
elif cmd_type == 'many':
if isinstance(ip, list):
for i in ip:
separator = ''if ':' in i:
separator = ':'elif ',' in i:
separator = ','if separator != '':
data = i.split(separator)
task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
else:
return '請(qǐng)檢查ip和命令間的分隔符'else:
return 'ip的類型為%s, 請(qǐng)傳入一個(gè)正確的類型' % type(ip)
else:
return 'cmd_type不存在%s值, 請(qǐng)傳入一個(gè)正確的參數(shù)' % cmd_type
self.logger.debug('檢查變量task:%s' % task)
results = {}
for future in as_completed(task):
res = future.result().split(':')
results[res[1]] = {res[0]: res[2]}
if 'success' in future.result():
print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
elif 'failed' in future.result():
print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
pool.shutdown()
json_results = {
'task_name': task_name,
'task_sn': self.task_sn,
'start_time': self.now_time,
'cost_time': '%.2fs' % (time.perf_counter() - self.s),
'results': results
}
self.logger.info('json_results:%s' % json_results)
with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
f.write(json.dumps(json_results))
return json_results

同時(shí),我們還衍生出一個(gè)需求,既然都要讀取配置,那同樣也可以提前把ip地址準(zhǔn)備在文件里。正好也能讀取我們返回的執(zhí)行程序的結(jié)果。

import os
import json


def get_info(self, path):
self.logger.debug('接收參數(shù)path:%s'.encode('utf8') % path.encode('utf8'))
if os.path.exists(path):
info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
return info_list
else:
self.logger.warning('%s不存在,請(qǐng)傳入一個(gè)正確的目錄' % path)
raise ValueError('%s不存在,請(qǐng)傳入一個(gè)正確的目錄' % path)


def log_analysis(filename):if os.path.exists(filename):
try:
data = json.load(open(filename, 'r', encoding='utf8'))
return data
except Exception as e:
print('%s無(wú)法解析該類型文件' % filename + ' ' + str(e))
raise TypeError('%s無(wú)法解析該類型文件' % filename + ' ' + str(e))
else:
raise ValueError('該%s文件路徑不存在,請(qǐng)傳入一個(gè)正確的文件路徑' % filename)


def show_log(self, filename, mode=None):
data: dict = self.log_analysis(filename)
if isinstance(data, dict):
for key, value in data["results"].items():
if 'success' in value.keys():
if mode == 'success':
print(key)
elif mode is None:
print('%s:%s' % (key, value['success'].replace('\r\n', '')))
elif 'failed' in value.keys():
if mode == 'failed':
print(key)
elif mode is None:
print('%s:%s' % (key, value['failed'].replace('\r\n', '')))

完整代碼展示:

from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
import os
import re
import time
import stat
import json
import random
import logging
import asyncio
import argparse
import paramiko


class TaskManager:def __init__(self, timeout=10, workers=15, system='linux'):
self.username = 'root'
self.port = 22
self.datetime = time.strftime("%Y-%m-%d", time.localtime())
self.timeout = timeout
self.workers = workers
self.now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
self.s = time.perf_counter()
self.task_sn = self.sn_random_generator()

if system == 'linux':
self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
self.log_path = '/tmp/TaskManager/log/'
self.log_debug_path = '/tmp/TaskManager/debug/'elif system == 'windows':
self.pkey = paramiko.RSAKey.from_private_key_file(r'C:\Users\001\.ssh\id_rsa')
self.log_path = r'D:\tmp\TaskManager\log\\'
self.log_debug_path = r'D:\tmp\TaskManager\debug\\'

if os.path.exists(self.log_path) is False:
os.makedirs(self.log_path, exist_ok=True)
if os.path.exists(self.log_debug_path) is False:
os.makedirs(self.log_debug_path, exist_ok=True)

self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logging.DEBUG)
self.handler = logging.FileHandler(self.log_debug_path + '%s_%s.log' % (self.datetime, self.task_sn))
self.formatter = logging.Formatter("%(asctime)s[%(levelname)s][%(funcName)s]%(message)s ")
self.handler.setFormatter(self.formatter)
self.logger.addHandler(self.handler)
self.logger.info('初始化完成'.encode(encoding='utf8'))

def ssh_client(self, ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if auth == 'id_rsa':
self.logger.info('正在 SSH 連接%s'.encode('utf8') % str(ip).encode('utf8'))
client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=60, timeout=self.timeout)
elif auth == 'noAuth':
if user is not None and passwd is not None:
client.connect(ip, self.port, user, passwd, banner_timeout=60, timeout=self.timeout)
# allow_agent=False, look_for_keys=False# No existing session 解決辦法 else:raise ValueError('傳入的用戶名密碼不能為空')else:raise NameError('不存在此%s 認(rèn)證方式' % auth)return client

def ssh_exec(self, ip, cmd, user=None, passwd=None, auth='id_rsa'):try:
ssh = self.ssh_client(ip, user, passwd, auth)
stdin, stdout, stderr = ssh.exec_command(command=cmd, get_pty=True)
self.logger.debug('%s:stdin 輸入:%s' % (ip, stdin))
self.logger.debug('%s:stderr 錯(cuò)誤:%s' % (ip, stderr))
self.logger.debug('%s:stdout 輸出:%s' % (ip, stdout))
result = stdout.read().decode('utf-8')
ssh.close()
return 'success:' + ip + ':' + str(result)
except Exception as e:
return 'failed:' + ip + ':' + str(e)

def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
self.logger.debug('接收參數(shù) ip:%s, cmd:%s, user:%s passwd:%s cmd_type:%s, task_name:%s' %
(ip, cmd, user, passwd, cmd_type, task_name))
print('\033[35;1m-------------------Task is set up! Time:%s ----------------------\033[0m' % self.now_time)
pool = http://www.5511xx.com/article/dpogish.html