1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
#-- coding:UTF-8 -- # Author:dota_st # Date:2021/6/2 23:39 # blog: www.wlhhlc.top import re import threadpool import urllib.parse import urllib.request import ssl from urllib.error import HTTPError import time import tldextract from fake_useragent import UserAgent import os import requests ssl._create_default_https_context = ssl._create_stdlib_context
bd_mb = [] gg = [] global flag flag = 0
#数据清洗 def get_data(): url_list = open("ip反查结果.txt").readlines() with open("domain.txt", 'w') as f: for i in url_list: i = i.strip() res = i.split('[ip138]:')[1].split('[aizhan]')[0].split(",")[0].strip() if res == 'None' or res == '[]': res = i.split('[aizhan]:')[1].split(",")[0].strip() if res != '[]': res = re.sub('['[]]', '', res) ext = tldextract.extract(res) res1 = i.split('[url]:')[1].split('[ip138]')[0].strip() res2 = "http://www." + '.'.join(ext[1:]) result = '[url]:' + res1 + 't' + '[domain]:' + res2 f.write(result + "n")
def getPc(domain): ua_header = UserAgent() headers = { 'Host': 'baidurank.aizhan.com', 'User-Agent': ua_header.random, 'Sec-Fetch-Dest': 'document', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Cookie': '' } aizhan_pc = 'https://baidurank.aizhan.com/api/br?domain={}&style=text'.format(domain) try: req = urllib.request.Request(aizhan_pc, headers=headers) response = urllib.request.urlopen(req,timeout=10) b = response.read() a = b.decode("utf8") result_pc = re.findall(re.compile(r'>(.*?)</a>'),a) pc = result_pc[0] except HTTPError as u: time.sleep(3) return getPc(domain) return pc
def getMobile(domain): ua_header = UserAgent() headers = { 'Host': 'baidurank.aizhan.com', 'User-Agent': ua_header.random, 'Sec-Fetch-Dest': 'document', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Cookie': '' } aizhan_pc = 'https://baidurank.aizhan.com/api/mbr?domain={}&style=text'.format(domain) try: req = urllib.request.Request(aizhan_pc, headers=headers) response = urllib.request.urlopen(req,timeout=10) b = response.read() a = b.decode("utf8") result_m = re.findall(re.compile(r'>(.*?)</a>'),a) mobile = result_m[0] except HTTPError as u: time.sleep(3) return getMobile(domain)
return mobile # 权重查询 def seo(domain, url): try: result_pc = getPc(domain) result_mobile = getMobile(domain) except Exception as u: if flag == 0: print('[!] 目标{}检测失败,已写入fail.txt等待重新检测'.format(url)) print(domain) with open('fail.txt', 'a', encoding='utf-8') as o: o.write(url + 'n') else: print('[!!]目标{}第二次检测失败'.format(url)) result = '[+] 百度权重:'+ result_pc +' 移动权重:'+ result_mobile +' '+url print(result) if result_pc =='0' and result_mobile =='0': gg.append(result) else: bd_mb.append(result)
return True
def exp(url): try: main_domain = url.split('[domain]:')[1] ext = tldextract.extract(main_domain) domain = '.'.join(ext[1:]) rew = seo(domain, url) except Exception as u: pass
def multithreading(funcname, params=[], filename="domain.txt", pools=15): works = [] with open(filename, "r") as f: for i in f: func_params = [i.rstrip("n")] + params works.append((func_params, None)) pool = threadpool.ThreadPool(pools) reqs = threadpool.makeRequests(funcname, works) [pool.putRequest(req) for req in reqs] pool.wait()
def google_simple(url, j): google_pc = "https://pr.aizhan.com/{}/".format(url) bz = 0 http_or_find = 0 try: response = requests.get(google_pc, timeout=10).text http_or_find = 1 result_pc = re.findall(re.compile(r'<span>谷歌PR:</span><a>(.*?)/></a>'), response)[0] result_num = result_pc.split('alt="')[1].split('"')[0].strip() if int(result_num) > 0: bz = 1 result = '[+] 谷歌权重:' + result_num + ' ' + j return result, bz except: if(http_or_find !=0): result = "[!]格式错误:" + "j" return result, bz else: time.sleep(3) return google_simple(url, j)
def exec_function(): if os.path.exists("fail.txt"): f = open("fail.txt", 'w', encoding='utf-8') f.truncate() else: f = open("fail.txt", 'w', encoding='utf-8') multithreading(exp, [], "domain.txt", 15) fail_url_list = open("fail.txt", 'r').readlines() if len(fail_url_list) > 0: print("*"*12 + "正在开始重新检测失败的url" + "*"*12) global flag flag = 1 multithreading(exp, [], "fail.txt", 15) with open("权重列表.txt", 'w', encoding="utf-8") as f: for i in bd_mb: f.write(i + "n") f.write("n") f.write("-"*25 + "开始检测谷歌的权重" + "-"*25 + "n") f.write("n") print("*" * 12 + "正在开始检测谷歌的权重" + "*" * 12) for j in gg: main_domain = j.split('[domain]:')[1] ext = tldextract.extract(main_domain) domain = "www." + '.'.join(ext[1:]) google_result, bz = google_simple(domain, j) time.sleep(1) print(google_result) if bz == 1: f.write(google_result + "n") print("检测完成,已保存txt在当前目录下") def main(): get_data() exec_function()
if __name__ == "__main__": main()
|