基于Python同Linux进行交互式操作实现通过堡垒机访问目标机
by:授客 QQ:1033553122 欢迎加入全国软件测试交流群:7156436
实现功能
远程登录Linux堡垒机,同Linux进行交互式操作,访问目标机
测试环境
Win7 64位
Python 3.3.4
paramiko 1.15.2
下载地址:
cryptography-1.0-cp34-none-win_amd64.whl
(如果paramiko可以正常安装完,则不需要安装该类库)
下载地址:
安装好后,找到nt.py(本例中路径为:
Lib\site-packages\pycrypto-2.6.1-py3.4-win-amd64.egg\Crypto\Random\OSRNG\nt.py),修改
import winrandom
为
from Crypto.Random.OSRNG import winrandom
如下
#import winrandom
from Crypto.Random.OSRNG import winrandom
以解决ImportError: No module named 'winrandom'错误
说明:具体文件路径可能还得根据实际报错情况来确定,如下
............(略)
"D:\Program Files\python33\lib\site-packages\Crypto\Random\OSRNG\nt.py", line 28, in
import winrandom
ImportError: No module named 'winrandom'
VS2010
因操作系统而异,可能需要安装VS2010,以解决包依赖问题
需求
SSH登录堡垒机后,根据提示输入相关信息,进入到目标机,如下
代码实践
#!/usr/bin/env/ python# -*- coding:utf-8 -*-__author__ = 'shouke'
from paramiko.client import AutoAddPolicyfrom paramiko.client import SSHClientclass MySSHClient: def __init__(self): self.ssh_client = SSHClient() # 连接登录 def connect(self, hostname, port, username, password, host_via_by_bastion): try: self.ssh_client.set_missing_host_key_policy(AutoAddPolicy()) self.ssh_client.connect(hostname=hostname, port=port, username=username, password=password, timeout=30) channel = self.ssh_client.invoke_shell() channel.settimeout(10) # 读、写操作超时时间,10秒 ###################### 定制化开发 ###########
print('正在通过堡垒机:%s 访问目标机:%s' % (hostname, host_via_by_bastion)) host_via_by_bastion = host_via_by_bastion + '\n' prompt_input_list = [{ 'Please enter your ID':'xxx255222\n'}, { 'Please enter your password': 'passwd123\n'}, { 'Please select your app ip':host_via_by_bastion}, { 'select your user for login':'1\n'}]
end_flag1 = ']$' end_flag2 = ']#' stdout = '' # 存放每次执行读取的内容 for item in prompt_input_list: prompt = list(item.keys())[0] input = item[prompt] flag = False while stdout.find(prompt) == -1: try: stdout += channel.recv(65535).decode('utf-8')#.decode('utf-8') flag = True except Exception as e: print('通过堡垒机:%s 访问目标机:%s 失败:%s' % (hostname, host_via_by_bastion, e)) flag = False break if flag: channel.send(input) else: # 未找到了对应提示 return [False, '通过堡垒机:%s 访问目标机:%s 失败,可能是读取命令返回结果超时,或者没找到对应输入提示' % (hostname, host_via_by_bastion)] flag = False while not (stdout.rstrip().endswith(end_flag1) or stdout.rstrip().endswith(end_flag2)): try: stdout = channel.recv(2048).decode('utf-8') flag = True except Exception as e: print('通过堡垒机:%s 访问目标机:%s 失败:%s' % (hostname, host_via_by_bastion, e)) flag = False break if flag: return [True, ''] else: # 未找到了对应提示 return [False, '没出现成功登录提示符 ]$ 或 ]# '] channel.close() return [True, ''] except Exception as e: return [False, '%s' % e] # 远程执行命令 def exec_command(self, command, target_host, bastion_host): try: channel = self.ssh_client.invoke_shell() channel.settimeout(30) # 读、写操作超时时间,30秒 end_flag1 = ']$' end_flag2 = ']#' ################ 定制化开发 ############################## if bastion_host != '': print('正在通过堡垒机:%s 访问目标机:%s' % (bastion_host, target_host)) target_host_input = target_host + '\n' prompt_input_list = [{ 'Please enter your ID':'01367599\n'}, { 'Please enter your password': 'Huozhe2020\n'}, { 'Please select your app ip':target_host_input}, { 'select your user for login':'1\n'}] stdout = '' # 存放每次执行读取的内容 for item in prompt_input_list: prompt = list(item.keys())[0] input = item[prompt] flag = False while stdout.find(prompt) == -1: try: stdout += channel.recv(65535).decode('utf-8')#.decode('utf-8') flag = True except Exception as e: print('通过堡垒机:%s 访问目标机:%s 失败:%s' % (bastion_host, target_host, e)) flag = False break if flag: channel.send(input) else: # 未找到了对应提示 print('通过堡垒机:%s 访问目标机:%s 失败,可能是读取命令返回结果超时,或者没找到对应输入提示' % (bastion_host, target_host)) # return [False, '通过堡垒机:%s 访问目标机:%s 失败,可能是读取命令返回结果超时,或者没找到对应输入提示' % (bastion_host, target_host)] return while not (stdout.rstrip().endswith(end_flag1) or stdout.rstrip().endswith(end_flag2)): try: stdout = channel.recv(2048).decode('utf-8') except Exception as e: print('通过堡垒机:%s 访问目标机:%s 失败:没出现成功登录提示符 ]$ 或 ]#' % (bastion_host, target_host)) return channel.send(command+'\n') command_res = '' # 存放每次执行读取的内容 while not (command_res.endswith(end_flag1) or command_res.endswith(end_flag2)): try: command_res = channel.recv(2048).decode('utf-8').strip() except Exception as e: print('在目标机(IP: %s)上进行读取操作超时' % target_host) break channel.close() except Exception as e: print('针对目标机:%s 执行命令: %s 出错 %s' % (target_host, command, e))