实现原理,用ping命令对ip-list中的ip检测延迟,选择最优的ip写入host中
自动从DNS服务器解析ip,并加入ip-list
使用了requests库,主要用于检验SSL证书
main.py
import json
import asyncio
import os
import re
import time
import platform
import requests
def write_to_hosts(ip, domain):
# 确定 hosts 文件路径
if platform.system() == "Windows":
hosts_path = r"C:\Windows\System32\drivers\etc\hosts"
else:
hosts_path = "/etc/hosts"
# 检查是否有写入权限
if not os.access(hosts_path, os.W_OK):
print("Error: 没有权限修改 hosts 文件,请以管理员身份运行脚本。")
return
# 构造新的 hosts 条目
new_entry = f"{ip} {domain}\n"
# 读取 hosts 文件内容
try:
with open(hosts_path, 'r') as file:
lines = file.readlines()
# 检查是否已经存在相同的条目
exists = False
for i, line in enumerate(lines):
if re.search(r'\b' + re.escape(domain) + r'\b', line):
lines[i] = new_entry # 替换已有条目
exists = True
break
if not exists:
lines.append(new_entry) # 添加新条目
# 写回 hosts 文件
with open(hosts_path, 'w') as file:
file.writelines(lines)
print(f"{ip} 写入成功")
except Exception as e:
print(f"写入 hosts 文件失败: {e}")
def test_host(url):
try:
response = requests.get(f"https://{url}")
print(f"{url}: {response.status_code}")
if response.status_code == 200:
return True
else:
return False
except Exception as e:
print(f"{url}: {e}")
return False
async def test_ip(ip, ip_record):
process = await asyncio.create_subprocess_exec(
"ping", "-n", "1", ip,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
match = r'(\d+)ms'
delay = re.findall(match, stdout.decode("gbk", "replace"))
if delay:
delay = int(delay[0])
print(f"{ip}: {delay}ms")
ip_record.append(delay)
else:
print(f"{ip}: Timeout")
ip_record.append(4000)
if len(ip_record) > 12:
ip_record = ip_record[-12:]
async def get_ip(url, dns_ip):
process = await asyncio.create_subprocess_exec(
"nslookup", url, dns_ip,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
stdout = stdout.decode("gbk", "replace")
match = re.escape(url)+r'\s*'+r'Address:\s*(\d+\.\d+\.\d+\.\d+)'
ip = re.findall(match, stdout)
if ip:
return ip[0]
else:
print(f"{url}: DNS解析失败")
return None
async def async_run_test(ip_list):
tasks = []
for ip in ip_list:
task = asyncio.create_task(test_ip(ip, ip_list[ip]))
tasks.append(task)
await asyncio.gather(*tasks)
async def async_run_dns(url, dns_list):
tasks = []
for dns in dns_list:
task = asyncio.create_task(get_ip(url, dns))
tasks.append(task)
iplist = await asyncio.gather(*tasks)
iplist = list(dict.fromkeys(iplist))
return [ip for ip in iplist if ip]
def run_test(url):
ip_list = records["ip-list"][url]
asyncio.run(async_run_test(ip_list))
sorted_list = sorted(ip_list.items(), key=lambda item: sum(item[1]) / len(item[1]))
if not test_host(url):
for ip in sorted_list:
write_to_hosts(ip[0], url)
if test_host(url):
# 验证SSL证书
print(f"{url} 解析成功")
break
ip_list = dict(sorted_list)
records["ip-list"][url] = ip_list
def run_dns(url):
print(f"正在解析 {url} 的 IP 地址...")
dns_list = records["DNS-list"]
iplist = asyncio.run(async_run_dns(url, dns_list))
print(iplist)
for ip in iplist:
if not records["ip-list"][url].get(ip):
records["ip-list"][url][ip] = []
if __name__ == '__main__':
path = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(path, 'records.json')
while True:
url = "github.com"
if not os.path.exists(path):
print("records.json not found")
exit()
with open(path, 'r') as f:
records = json.load(f)
run_dns(url)
run_test(url)
with open(path, 'w') as f:
json.dump(records, f, indent=4, ensure_ascii=False)
print("Waiting...")
try:
time.sleep(300)
except KeyboardInterrupt:
break
ip-list.json
{
"DNS-list": [
"1.1.1.1",
"1.0.0.1",
"8.8.8.8",
"8.8.4.4",
"223.5.5.5",
"223.6.6.6",
"180.76.76.76",
"101.226.4.6",
"218.30.118.6",
"208.67.222.222",
"208.67.220.220",
"114.114.114.114",
"114.114.115.115"
],
"ip-list": {
"github.com": {
"20.205.243.160": [123, 110, 109],
"20.205.245.248": [125, 109, 111],
"20.205.243.166": [110, 107, 113],
"20.27.177.118": [154, 136, 139],
"20.27.177.113": [169, 144, 147],
"20.200.245.247": [160, 152, 159],
"20.207.73.82": [167, 153, 161],
"20.233.83.145": [192, 199, 196],
"20.248.137.48": [207, 200, 213],
"20.29.134.23": [242, 239, 244],
"140.82.121.4": [263, 242, 244],
"20.29.134.19": [247, 245, 246],
"20.86.245.4": [265, 264, 266],
"20.87.245.0": [277, 277, 276],
"20.175.192.146": [304, 298, 296],
"140.82.113.4": [314, 295, 294],
"20.175.192.147": [294, 299, 299],
"140.82.114.3": [314, 319, 311],
"140.82.112.3": [319, 313, 311],
"140.82.114.4": [320, 313, 312],
"20.201.28.152": [413, 405, 405],
"20.201.28.151": [427, 408, 421],
"140.82112.4": [4000, 4000, 4000],
"140.82113.3": [4000, 4000, 4000],
"140.82121.3": [4000, 4000, 4000],
"51.142.105.107": [4000, 4000, 4000],
"20.218.253.22": [4000, 4000, 4000],
"20.12.240.255": [4000, 4000, 4000],
"20.203.59.95": [4000, 4000, 4000],
"20.203.176.211": [4000, 4000, 4000],
"20.113.161.247": [4000, 4000, 4000],
"20.123.206.47": [4000, 4000, 4000],
"20.106.123.66": [4000, 4000, 4000],
"20.205.143.41": [4000, 4000, 4000],
"192.30.255.112": [4000, 4000, 4000],
"192.30.255.113": [4000, 4000, 4000],
"20.200.245.160": [4000, 4000, 4000],
"20.87.254.0": [4000, 4000, 4000],
"104.244.46.165": [4000, 4000, 4000]
}
}
}