Files
free/main.py
2025-03-10 12:50:17 +08:00

1730 lines
71 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import base64
import yaml
import json
import socket
import time
import re
import os
import shutil
import tempfile
import platform
import subprocess
import random
import zipfile
import io
from datetime import datetime
from urllib.parse import urlparse, parse_qs, unquote
from concurrent.futures import ThreadPoolExecutor, as_completed
# 订阅链接列表
links = [
"https://ghproxy.net/https://raw.githubusercontent.com/xiaoer8867785/jddy5/main/data/{Y_m_d}/{x}.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/firefoxmmx2/v2rayshare_subcription/main/subscription/clash_sub.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/Roywaller/clash_subscription/refs/heads/main/clash_subscription.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/Q3dlaXpoaQ/V2rayN_Clash_Node_Getter/refs/heads/main/APIs/sc0.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/Q3dlaXpoaQ/V2rayN_Clash_Node_Getter/refs/heads/main/APIs/sc1.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/Q3dlaXpoaQ/V2rayN_Clash_Node_Getter/refs/heads/main/APIs/sc2.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/Q3dlaXpoaQ/V2rayN_Clash_Node_Getter/refs/heads/main/APIs/sc3.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/Q3dlaXpoaQ/V2rayN_Clash_Node_Getter/refs/heads/main/APIs/sc4.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/xiaoji235/airport-free/refs/heads/main/clash/naidounode.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/mahdibland/ShadowsocksAggregator/master/LogInfo.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/mahdibland/SSAggregator/master/sub/sub_merge_yaml.yml",
"https://ghproxy.net/https://raw.githubusercontent.com/mahdibland/ShadowsocksAggregator/master/Eternity.yml",
"https://ghproxy.net/https://raw.githubusercontent.com/vxiaov/free_proxies/main/clash/clash.provider.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/ljlfct01/ljlfct01.github.io/refs/heads/main/节点",
"https://ghproxy.net/https://raw.githubusercontent.com/snakem982/proxypool/main/source/clash-meta.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/leetomlee123/freenode/refs/heads/main/README.md",
"https://ghproxy.net/https://raw.githubusercontent.com/chengaopan/AutoMergePublicNodes/master/list.yml",
"https://ghproxy.net/https://raw.githubusercontent.com/ermaozi/get_subscribe/main/subscribe/clash.yml",
"https://ghproxy.net/https://raw.githubusercontent.com/zhangkaiitugithub/passcro/main/speednodes.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/mgit0001/test_clash/refs/heads/main/heima.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/mai19950/clashgithub_com/refs/heads/main/site",
"https://ghproxy.net/https://raw.githubusercontent.com/aiboboxx/clashfree/refs/heads/main/clash.yml",
"https://ghproxy.net/https://raw.githubusercontent.com/aiboboxx/v2rayfree/refs/heads/main/README.md",
"https://ghproxy.net/https://raw.githubusercontent.com/Pawdroid/Free-servers/refs/heads/main/sub",
"https://ghproxy.net/https://raw.githubusercontent.com/shahidbhutta/Clash/refs/heads/main/Router",
"https://ghproxy.net/https://raw.githubusercontent.com/peasoft/NoMoreWalls/master/list.meta.yml",
"https://ghproxy.net/https://raw.githubusercontent.com/anaer/Sub/refs/heads/main/clash.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/a2470982985/getNode/main/clash.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/free18/v2ray/refs/heads/main/c.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/peasoft/NoMoreWalls/master/list.yml",
"https://ghproxy.net/https://raw.githubusercontent.com/mfbpn/tg_mfbpn_sub/main/trial.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/Ruk1ng001/freeSub/main/clash.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/SoliSpirit/v2ray-configs/main/all_configs.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/ripaojiedian/freenode/main/clash",
"https://ghproxy.net/https://raw.githubusercontent.com/go4sharing/sub/main/sub.yaml",
"https://ghproxy.net/https://raw.githubusercontent.com/mfuu/v2ray/master/clash.yaml",
"https://api.mxlweb.xyz/sub?target=clash&url=https://www.xrayvip.com/free.yaml&insert=false",
"https://api.mxlweb.xyz/sub?target=clash&url=https://mxlsub.me/free&insert=false",
"https://www.freeclashnode.com/uploads/{Y}/{m}/0-{Ymd}.yaml",
"https://www.freeclashnode.com/uploads/{Y}/{m}/1-{Ymd}.yaml",
"https://www.freeclashnode.com/uploads/{Y}/{m}/2-{Ymd}.yaml",
"https://www.freeclashnode.com/uploads/{Y}/{m}/3-{Ymd}.yaml",
"https://www.freeclashnode.com/uploads/{Y}/{m}/4-{Ymd}.yaml",
"https://sub.reajason.eu.org/clash.yaml",
"https://proxypool.link/trojan/sub",
"https://proxypool.link/ss/sub",
"https://proxypool.link/vmess/sub",
"https://mxlsub.me/newfull",
"https://igdux.top/5Hna",
"https://ghproxy.net/https://raw.githubusercontent.com/Pawdroid/Free-servers/main/sub",
"https://ghproxy.net/https://raw.githubusercontent.com/chengaopan/AutoMergePublicNodes/master/list.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/aiboboxx/v2rayfree/main/v2",
"https://ghproxy.net/https://raw.githubusercontent.com/roosterkid/openproxylist/main/V2RAY_BASE64.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/vpnmarket/sub/refs/heads/main/hiddify1.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/vpnmarket/sub/refs/heads/main/hiddify2.txt",
"https://ghproxy.net/https://raw.githubusercontent.com/vpnmarket/sub/refs/heads/main/hiddify3.txt",
]
# 测试使用
# links = [
# "https://ghproxy.net/https://raw.githubusercontent.com/firefoxmmx2/v2rayshare_subcription/main/subscription/clash_sub.yaml",
# "https://ghproxy.net/https://raw.githubusercontent.com/Roywaller/clash_subscription/refs/heads/main/clash_subscription.txt",
# "https://www.freeclashnode.com/uploads/{Y}/{m}/0-{Ymd}.yaml",
# "https://ghproxy.net/https://raw.githubusercontent.com/aiboboxx/clashfree/refs/heads/main/clash.yml",
# "https://ghproxy.net/https://raw.githubusercontent.com/mahdibland/ShadowsocksAggregator/master/LogInfo.txt",
# 'https://ghproxy.net/https://raw.githubusercontent.com/aiboboxx/v2rayfree/main/v2',
# 'https://ghproxy.net/https://raw.githubusercontent.com/roosterkid/openproxylist/main/V2RAY_BASE64.txt',
# 'https://ghproxy.net/https://raw.githubusercontent.com/vpnmarket/sub/refs/heads/main/hiddify1.txt',
# ]
# 支持的协议类型列表
SUPPORTED_PROTOCOLS = [
'vmess://',
'trojan://',
'vless://',
'ss://',
'ssr://',
'http://',
'https://',
'socks://',
'socks5://',
'hysteria://',
'wireguard://'
]
# 测速相关配置
# 测试URL列表
TEST_URLS = [
"http://www.gstatic.com/generate_204", # Google测试
]
CONNECTION_TIMEOUT = 10 # 连接超时时间,单位为秒
MAX_CONCURRENT_TESTS = 100 # 最大并发测试数量
DEBUG_MODE = False # 默认开启调试模式,方便查看处理过程
# 核心程序配置
CORE_PATH = None # 核心程序路径,将自动检测
def is_github_raw_url(url):
"""判断是否为GitHub的raw URL"""
return 'raw.githubusercontent.com' in url
def extract_file_pattern(url):
"""从URL中提取文件模式例如{x}.yaml中的.yaml"""
match = re.search(r'\{x\}(\.[a-zA-Z0-9]+)(?:/|$)', url)
if match:
return match.group(1) # 返回文件后缀,如 '.yaml', '.txt', '.json'
return None
def get_github_filename(github_url, file_suffix):
"""从GitHub API获取匹配指定后缀的文件名"""
try:
print(f"处理GitHub URL: {github_url}")
# 标准化URL - 移除代理前缀
url_without_proxy = github_url
if 'ghproxy.net/' in github_url:
url_without_proxy = github_url.split('ghproxy.net/', 1)[1]
# 提取仓库所有者、名称和分支信息
url_parts = url_without_proxy.replace('https://raw.githubusercontent.com/', '').split('/')
if len(url_parts) < 3:
print(f"URL格式不正确: {github_url}")
return None
owner = url_parts[0]
repo = url_parts[1]
branch = url_parts[2]
# 处理分支信息
original_branch = branch
if 'refs/heads/' in branch:
branch = branch.split('refs/heads/')[1]
# 提取文件路径 - 忽略仓库信息和{x}部分
# 例如owner/repo/branch/path/to/directory/{x}.yaml -> path/to/directory
path_parts = '/'.join(url_parts[3:]) # 获取路径部分
if '{x}' in path_parts:
directory_path = path_parts.split('/{x}')[0]
else:
directory_path = path_parts
print(f"解析结果: 仓库={owner}/{repo}, 分支={branch}, 路径={directory_path}")
# 构建GitHub API URL
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{directory_path}"
# 添加ref参数指定分支
if branch:
api_url += f"?ref={branch}"
print(f"构建的API URL: {api_url}")
# 使用代理访问GitHub API
proxy_api_url = f"https://ghproxy.net/{api_url}"
print(f"尝试通过代理访问: {proxy_api_url}")
try:
response = requests.get(proxy_api_url, timeout=30)
if response.status_code != 200:
print("代理访问失败尝试直接访问GitHub API")
response = requests.get(api_url, timeout=30)
except Exception as e:
print(f"代理访问失败: {str(e)},尝试直接访问")
response = requests.get(api_url, timeout=30)
if response.status_code != 200:
print(f"GitHub API请求失败: {response.status_code} - {api_url}")
print(f"响应内容: {response.text[:200]}...")
return None
# 解析返回的JSON
files = response.json()
if not isinstance(files, list):
print(f"GitHub API返回的不是文件列表: {type(files)}")
print(f"响应内容: {str(files)[:200]}...")
return None
print(f"在目录中找到{len(files)}个文件/目录")
# 查找匹配后缀的文件
matching_files = [f['name'] for f in files if f['name'].endswith(file_suffix)]
if not matching_files:
print(f"未找到匹配{file_suffix}后缀的文件,目录包含: {[f['name'] for f in files][:10]}")
return None
# 排序并选择第一个匹配的文件(通常选择最近的文件)
matching_files.sort(reverse=True)
selected_file = matching_files[0]
print(f"选择文件: {selected_file}")
return selected_file
except Exception as e:
print(f"获取GitHub文件列表出错: {str(e)}")
import traceback
traceback.print_exc()
return None
def format_current_date(url):
"""替换URL中的日期占位符和{x}占位符"""
# 定义和生成所有可能的日期格式变量
now = datetime.now()
date_vars = {
# 基本日期组件
'Y': now.strftime('%Y'), # 年份如2023
'm': now.strftime('%m'), # 月份如05
'd': now.strftime('%d'), # 日期如09
# 组合日期格式
'Ymd': now.strftime('%Y%m%d'), # 组合格式如20230509
'Y-m-d': now.strftime('%Y-%m-%d'), # 带连字符格式如2023-05-09
'Y_m_d': now.strftime('%Y_%m_%d'), # 带下划线格式如2023_05_09
# 额外日期格式
'Y-m': now.strftime('%Y-%m'), # 年月如2023-05
'Y_m': now.strftime('%Y_%m'), # 带下划线的年月如2023_05
'md': now.strftime('%m%d'), # 月日如0509
'm-d': now.strftime('%m-%d'), # 带连字符的月日如05-09
'm_d': now.strftime('%m_%d'), # 带下划线的月日如05_09
}
# 处理日期占位符
try:
formatted_url = url.format(**date_vars)
except KeyError as e:
print(f"URL中包含未支持的日期格式占位符: {e}")
print(f"支持的日期占位符有: {', '.join(date_vars.keys())}")
return url # 返回原始URL让后续处理决定是否跳过
# 处理{x}占位符
if '{x}' in formatted_url:
# 提取后缀
file_suffix = extract_file_pattern(formatted_url)
if file_suffix and is_github_raw_url(formatted_url):
# 获取GitHub中匹配的文件名
filename = get_github_filename(formatted_url, file_suffix)
if filename:
# 替换{x}占位符为实际文件名
pattern = r'\{x\}' + re.escape(file_suffix)
formatted_url = re.sub(pattern, filename, formatted_url)
else:
print(f"警告: 未能解析{x}占位符, URL: {formatted_url}")
return formatted_url
def fetch_content(url):
"""获取订阅内容"""
try:
# 1. 首先替换日期相关的占位符
now = datetime.now()
date_vars = {
# 基本日期组件
'Y': now.strftime('%Y'), # 年份如2023
'm': now.strftime('%m'), # 月份如05
'd': now.strftime('%d'), # 日期如09
# 组合日期格式
'Ymd': now.strftime('%Y%m%d'), # 组合格式如20230509
'Y-m-d': now.strftime('%Y-%m-%d'), # 带连字符格式如2023-05-09
'Y_m_d': now.strftime('%Y_%m_%d'), # 带下划线格式如2023_05_09
# 额外日期格式
'Y-m': now.strftime('%Y-%m'), # 年月如2023-05
'Y_m': now.strftime('%Y_%m'), # 带下划线的年月如2023_05
'md': now.strftime('%m%d'), # 月日如0509
'm-d': now.strftime('%m-%d'), # 带连字符的月日如05-09
'm_d': now.strftime('%m_%d'), # 带下划线的月日如05_09
}
# 先将{x}占位符临时替换以免被format误处理
temp_marker = "___X_PLACEHOLDER___"
temporary_url = url.replace("{x}", temp_marker)
# 尝试使用format方法替换所有日期占位符
try:
formatted_url = temporary_url.format(**date_vars)
except KeyError as e:
# 如果format失败尝试手动替换
print(f"URL中包含未支持的日期格式占位符: {e}")
print(f"支持的日期占位符有: {', '.join(date_vars.keys())}")
formatted_url = temporary_url
# 手动替换常见的日期占位符
for pattern, replacement in [
('{Y_m_d}', now.strftime('%Y_%m_%d')),
('{Y-m-d}', now.strftime('%Y-%m-%d')),
('{Ymd}', now.strftime('%Y%m%d')),
('{Y}', now.strftime('%Y')),
('{m}', now.strftime('%m')),
('{d}', now.strftime('%d')),
]:
if pattern in formatted_url:
formatted_url = formatted_url.replace(pattern, replacement)
print(f"手动替换日期占位符 {pattern}{replacement}")
# 将临时标记替换回{x}
formatted_url = formatted_url.replace(temp_marker, "{x}")
# 2. 然后处理{x}占位符 - 现在日期占位符已经被替换
if '{x}' in formatted_url:
file_suffix = extract_file_pattern(formatted_url)
if file_suffix and is_github_raw_url(formatted_url):
print(f"在URL中找到{{x}}占位符,尝试获取匹配的文件...")
filename = get_github_filename(formatted_url, file_suffix)
if filename:
pattern = r'\{x\}' + re.escape(file_suffix)
formatted_url = re.sub(pattern, filename, formatted_url)
print(f"成功替换{{x}}占位符为: {filename}")
else:
print(f"警告: 未能获取匹配{file_suffix}的文件")
else:
print(f"警告: 无法处理{{x}}占位符URL不是GitHub raw链接或找不到文件后缀")
print(f"实际请求URL: {formatted_url}")
# 模拟Chrome浏览器请求头与curl命令类似
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'DNT': '1',
'Pragma': 'no-cache',
'Upgrade-Insecure-Requests': '1',
'sec-ch-ua': '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1'
}
# 特殊站点处理 - 对特定的站点使用不同的请求方式
special_sites = ['igdux.top']
use_session = any(site in formatted_url for site in special_sites)
if use_session:
# 使用Session对象来保持cookie等状态
session = requests.Session()
# 先发送一个HEAD请求获取cookie等信息
session.head(formatted_url, headers=headers, timeout=30)
response = session.get(formatted_url, headers=headers, timeout=60, stream=True)
else:
# 普通请求
response = requests.get(formatted_url, headers=headers, timeout=60, stream=True)
response.raise_for_status()
# 检查Content-Type确保正确处理各种类型的内容
content_type = response.headers.get('Content-Type', '').lower()
# print(f"Content-Type: {content_type}")
# 处理不同内容类型
# 1. 处理二进制类型
if 'application/octet-stream' in content_type or 'application/x-yaml' in content_type:
content = response.content.decode('utf-8', errors='ignore')
# 2. 处理明确指定了UTF-8字符集的文本
elif 'charset=utf-8' in content_type or 'text/plain' in content_type:
# 尝试多种解码方式
encodings_to_try = ['utf-8', 'gbk', 'latin1', 'ascii', 'iso-8859-1']
for encoding in encodings_to_try:
try:
content = response.content.decode(encoding, errors='ignore')
# 检查解码是否成功 - 如果包含常见订阅指示符
if any(indicator in content for indicator in ['proxies:', 'vmess://', 'trojan://', 'ss://', 'vless://']):
# print(f"使用 {encoding} 编码成功解码内容")
break
except UnicodeDecodeError:
continue
else:
# 如果所有编码都失败使用默认UTF-8
content = response.content.decode('utf-8', errors='ignore')
# 如果网址是特殊站点但仍然得到乱码尝试拆解HTML标记
if use_session and not any(indicator in content for indicator in ['proxies:', 'vmess://', 'trojan://', 'ss://', 'vless://']):
try:
# 尝试解析HTML并提取内容
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
# 查找所有可能包含订阅信息的元素
for element in soup.find_all(['pre', 'code', 'div', 'textarea']):
element_text = element.get_text()
if any(indicator in element_text for indicator in ['proxies:', 'vmess://', 'trojan://', 'ss://', 'vless://']):
print(f"从HTML元素中提取到订阅内容")
content = element_text
break
except ImportError:
print("未安装BeautifulSoup跳过HTML解析")
except Exception as e:
print(f"HTML解析错误: {str(e)}")
# 3. 处理可能是base64编码的内容
elif 'text/base64' in content_type:
content = response.content.decode('utf-8', errors='ignore')
# 4. 处理其他文本格式如json
elif 'application/json' in content_type or 'text/' in content_type:
content = response.content.decode('utf-8', errors='ignore')
# 5. 默认情况
else:
content = response.text
# 测试内容是否可能是Base64编码
if not any(indicator in content for indicator in ['proxies:', 'vmess://', 'trojan://', 'ss://', 'vless://']):
try:
# 移除空白字符尝试base64解码
cleaned_content = re.sub(r'\s+', '', content)
# 添加适当的填充
padding = len(cleaned_content) % 4
if padding:
cleaned_content += '=' * (4 - padding)
# 尝试base64解码
decoded = base64.b64decode(cleaned_content)
decoded_text = decoded.decode('utf-8', errors='ignore')
if any(indicator in decoded_text for indicator in ['proxies:', 'vmess://', 'trojan://', 'ss://', 'vless://']):
print("检测到Base64编码的订阅内容已成功解码")
content = decoded_text
except:
# 解码失败,继续使用原始内容
pass
return content
except KeyError as e:
print(f"URL中包含未支持的占位符: {e}")
return None
except Exception as e:
print(f"Error fetching {url}: {str(e)}")
import traceback
traceback.print_exc()
return None
def parse_clash_yaml(content):
"""解析Clash配置文件"""
try:
data = yaml.safe_load(content)
if not data:
return []
# 直接查找proxies字段无论它在哪个层级
if 'proxies' in data:
if DEBUG_MODE:
print(f"从YAML中找到 {len(data['proxies'])} 个节点")
return data['proxies']
# 如果没有找到proxies字段尝试其他可能的字段名
for key in ['proxy-providers', 'Proxy', 'proxys']:
if key in data and isinstance(data[key], list):
if DEBUG_MODE:
print(f"从YAML的{key}字段中找到 {len(data[key])} 个节点")
return data[key]
print("YAML中未找到节点信息")
return []
except Exception as e:
# print(f"解析Clash YAML失败: {str(e)}")
return []
def parse_v2ray_base64(content):
"""解析V2Ray Base64编码的配置"""
try:
# 处理多行base64
content = content.strip().replace('\n', '').replace('\r', '')
# 尝试修复可能的编码问题
try:
if isinstance(content, bytes):
content = content.decode('utf-8')
# 确保内容是ASCII兼容的
content = content.encode('ascii', 'ignore').decode('ascii')
except UnicodeError:
print("Error: Invalid encoding in base64 content")
return []
try:
decoded = base64.b64decode(content + '=' * (-len(content) % 4))
decoded_str = decoded.decode('utf-8', 'ignore')
except Exception as e:
print(f"Error decoding base64 content: {str(e)}")
return []
nodes = []
for line in decoded_str.split('\n'):
if line.startswith('vmess://') or line.startswith('trojan://'):
node = parse_v2ray_uri(line)
if node:
nodes.append(node)
return nodes
except Exception as e:
# print(f"Error parsing V2Ray base64: {str(e)}")
return []
def parse_v2ray_uri(uri):
"""解析V2Ray URI格式的配置"""
try:
# 处理vmess协议
if uri.startswith('vmess://'):
b64_config = uri.replace('vmess://', '')
# 确保base64正确填充
b64_config = b64_config + '=' * (-len(b64_config) % 4)
try:
config = json.loads(base64.b64decode(b64_config).decode())
return {
'type': 'vmess',
'name': config.get('ps', 'Unknown'),
'server': config.get('add', ''),
'port': int(config.get('port', 0)),
'uuid': config.get('id', ''),
'alterId': int(config.get('aid', 0)),
'cipher': config.get('type', 'auto'),
'tls': config.get('tls', '') == 'tls',
'network': config.get('net', 'tcp')
}
except json.JSONDecodeError:
# 某些情况下vmess可能使用非标准格式
print(f"Non-standard vmess format: {uri}")
return None
# 处理trojan协议
elif uri.startswith('trojan://'):
parsed = urlparse(uri)
query = parse_qs(parsed.query)
return {
'type': 'trojan',
'name': query.get('sni', [query.get('peer', ['Unknown'])[0]])[0],
'server': parsed.hostname or '',
'port': parsed.port or 443,
'password': parsed.username or ''
}
# 处理vless协议
elif uri.startswith('vless://'):
parsed = urlparse(uri)
query = parse_qs(parsed.query)
return {
'type': 'vless',
'name': query.get('remarks', [query.get('sni', ['Unknown'])[0]])[0],
'server': parsed.hostname or '',
'port': parsed.port or 443,
'uuid': parsed.username or '',
'tls': query.get('security', [''])[0] == 'tls',
'flow': query.get('flow', [''])[0],
'network': query.get('type', ['tcp'])[0]
}
# 处理shadowsocks协议
elif uri.startswith('ss://'):
# 首先获取#后面的名称部分(如果存在)
name = 'Unknown'
if '#' in uri:
name_part = uri.split('#', 1)[1]
name = unquote(name_part)
uri = uri.split('#', 1)[0] # 移除名称部分以便后续处理
if '@' in uri:
# 处理 ss://method:password@host:port
parsed = urlparse(uri)
server = parsed.hostname
port = parsed.port
# 提取方法和密码
userinfo = parsed.username
if userinfo:
try:
# 有些实现可能会对userinfo进行base64编码
decoded = base64.b64decode(userinfo + '=' * (-len(userinfo) % 4)).decode()
if ':' in decoded:
method, password = decoded.split(':', 1)
else:
method, password = 'aes-256-gcm', userinfo
except:
# 如果不是base64编码可能是明文
if ':' in userinfo:
method, password = userinfo.split(':', 1)
else:
method, password = 'aes-256-gcm', userinfo
else:
method, password = 'aes-256-gcm', ''
# 如果查询参数中包含remarks优先使用它
query = parse_qs(parsed.query)
if 'remarks' in query:
name = query.get('remarks', ['Unknown'])[0]
return {
'type': 'ss',
'name': name,
'server': server or '',
'port': port or 443,
'cipher': method,
'password': password
}
else:
# 处理 ss://BASE64(method:password@host:port)
b64_config = uri.replace('ss://', '')
try:
# 确保base64正确填充
b64_config = b64_config + '=' * (-len(b64_config) % 4)
config_str = base64.b64decode(b64_config).decode()
# 提取方法和密码
if '@' in config_str:
method_pwd, server_port = config_str.rsplit('@', 1)
method, password = method_pwd.split(':', 1)
server, port = server_port.rsplit(':', 1)
return {
'type': 'ss',
'name': name,
'server': server,
'port': int(port),
'cipher': method,
'password': password
}
except Exception as e:
# print(f"Invalid ss URI format: {uri}, error: {str(e)}")
return None
# 处理shadowsocksr协议
elif uri.startswith('ssr://'):
b64_config = uri.replace('ssr://', '')
try:
# 确保base64正确填充
b64_config = b64_config + '=' * (-len(b64_config) % 4)
config_str = base64.b64decode(b64_config).decode()
# SSR格式: server:port:protocol:method:obfs:base64pass/?obfsparam=base64param&protoparam=base64param&remarks=base64remarks
parts = config_str.split(':')
if len(parts) >= 6:
server = parts[0]
port = parts[1]
protocol = parts[2]
method = parts[3]
obfs = parts[4]
# 处理剩余参数
password_and_params = parts[5].split('/?', 1)
password_b64 = password_and_params[0]
password = base64.b64decode(password_b64 + '=' * (-len(password_b64) % 4)).decode()
# 提取参数
name = 'Unknown'
if len(password_and_params) > 1 and 'remarks=' in password_and_params[1]:
remarks_b64 = password_and_params[1].split('remarks=', 1)[1].split('&', 1)[0]
try:
name = base64.b64decode(remarks_b64 + '=' * (-len(remarks_b64) % 4)).decode()
except:
pass
return {
'type': 'ssr',
'name': name,
'server': server,
'port': int(port),
'protocol': protocol,
'cipher': method,
'obfs': obfs,
'password': password
}
except Exception as e:
# print(f"Error parsing SSR URI: {str(e)}")
return None
# 处理HTTP/HTTPS协议
elif uri.startswith(('http://', 'https://')):
parsed = urlparse(uri)
query = parse_qs(parsed.query)
return {
'type': 'http' if uri.startswith('http://') else 'https',
'name': query.get('remarks', ['Unknown'])[0],
'server': parsed.hostname or '',
'port': parsed.port or (80 if uri.startswith('http://') else 443),
'username': parsed.username or '',
'password': parsed.password or ''
}
# 处理SOCKS协议
elif uri.startswith(('socks://', 'socks5://')):
parsed = urlparse(uri)
query = parse_qs(parsed.query)
return {
'type': 'socks',
'name': query.get('remarks', ['Unknown'])[0],
'server': parsed.hostname or '',
'port': parsed.port or 1080,
'username': parsed.username or '',
'password': parsed.password or ''
}
# 处理hysteria协议
elif uri.startswith('hysteria://'):
parsed = urlparse(uri)
query = parse_qs(parsed.query)
return {
'type': 'hysteria',
'name': query.get('peer', ['Unknown'])[0],
'server': parsed.hostname or '',
'port': parsed.port or 443,
'protocol': query.get('protocol', [''])[0],
'auth': parsed.username or query.get('auth', [''])[0]
}
# 处理wireguard协议
elif uri.startswith('wireguard://'):
parsed = urlparse(uri)
query = parse_qs(parsed.query)
return {
'type': 'wireguard',
'name': query.get('remarks', ['Unknown'])[0],
'server': parsed.hostname or '',
'port': parsed.port or 51820,
'private_key': query.get('privateKey', [''])[0],
'public_key': query.get('publicKey', [''])[0],
'allowed_ips': query.get('allowedIPs', ['0.0.0.0/0'])[0]
}
except Exception as e:
# print(f"Error parsing URI: {str(e)}")
return None
def extract_nodes(content):
"""级联提取节点按照Base64 -> YAML -> 正则表达式 -> JSON的顺序尝试"""
if not content:
return []
nodes = []
methods_tried = []
# 1. 尝试Base64解码提取
try:
# 处理多行base64移除所有空白字符和特殊字符
cleaned_content = re.sub(r'[\s\n\r\t]+', '', content)
cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content)
# 确保base64字符串长度是4的倍数
padding_length = len(cleaned_content) % 4
if padding_length:
cleaned_content += '=' * (4 - padding_length)
# 尝试base64解码
try:
decoded_bytes = base64.b64decode(cleaned_content)
decoded_str = decoded_bytes.decode('utf-8', 'ignore')
# 检查解码后的内容是否包含任何支持的协议节点
if any(protocol in decoded_str for protocol in SUPPORTED_PROTOCOLS):
print("使用Base64解码提取节点")
methods_tried.append("Base64")
for line in decoded_str.split('\n'):
line = line.strip()
if any(line.startswith(protocol) for protocol in SUPPORTED_PROTOCOLS):
node = parse_v2ray_uri(line)
if node:
nodes.append(node)
except Exception as e:
# print(f"Base64解码失败或未找到节点: {str(e)}")
pass
except Exception as e:
print(f"Base64预处理失败: {str(e)}")
# 如果已经提取到节点,直接返回
if len(nodes) > 0:
print(f"通过【{methods_tried[-1]}】方法成功提取到{len(nodes)}个节点")
return nodes
# 2. 尝试解析YAML格式
try:
# 移除HTML标签和特殊标记
cleaned_content = re.sub(r'<[^>]+>|!&lt;str&gt;', '', content)
# 更强大的YAML格式检测查找常见Clash配置特征
yaml_indicators = [
'proxies:', 'Proxy:', 'proxy:', 'proxy-providers:',
'port:', 'socks-port:', 'allow-lan:', 'mode:',
'type: vmess', 'type: ss', 'type: trojan', 'type: vless'
]
if any(indicator in cleaned_content for indicator in yaml_indicators):
# print("尝试解析YAML格式内容")
methods_tried.append("YAML")
# 尝试直接加载YAML
try:
yaml_nodes = parse_clash_yaml(cleaned_content)
if yaml_nodes:
# print(f"从YAML中提取到{len(yaml_nodes)}个节点")
nodes.extend(yaml_nodes)
except Exception as yaml_error:
print(f"标准YAML解析失败: {str(yaml_error)}")
# 如果标准解析失败,尝试更宽松的解析方式
try:
# 尝试提取proxies部分
proxies_match = re.search(r'proxies:\s*\n([\s\S]+?)(?:\n\w+:|$)', cleaned_content)
if proxies_match:
proxies_yaml = "proxies:\n" + proxies_match.group(1)
yaml_nodes = parse_clash_yaml(proxies_yaml)
if yaml_nodes:
print(f"从proxies块提取到{len(yaml_nodes)}个节点")
nodes.extend(yaml_nodes)
except Exception as fallback_error:
print(f"尝试解析proxies块失败: {str(fallback_error)}")
except Exception as e:
print(f"YAML解析过程出错: {str(e)}")
# 如果已经提取到节点,直接返回
if len(nodes) > 0:
print(f"通过【{methods_tried[-1]}】方法成功提取到{len(nodes)}个节点")
return nodes
# 3. 尝试使用正则表达式直接提取
try:
# print("尝试使用正则表达式直接提取节点")
methods_tried.append("正则表达式")
# 为每种支持的协议定义正则表达式并提取
for protocol in SUPPORTED_PROTOCOLS:
if protocol == 'vmess://':
# vmess通常是一个base64编码的字符串
found_nodes = re.findall(r'vmess://[A-Za-z0-9+/=]+', content)
elif protocol == 'hysteria://' or protocol == 'wireguard://':
# 这些协议可能有特殊格式,需要特别处理
found_nodes = re.findall(f'{protocol}[^"\'<>\\s]+', content)
else:
# 对于其他协议,采用通用正则表达式
found_nodes = re.findall(f'{protocol}[^"\'<>\\s]+', content)
for uri in found_nodes:
node = parse_v2ray_uri(uri)
if node:
nodes.append(node)
except Exception as e:
print(f"正则表达式提取失败: {str(e)}")
# 如果已经提取到节点,直接返回
if len(nodes) > 0:
print(f"通过【{methods_tried[-1]}】方法成功提取到{len(nodes)}个节点")
return nodes
# 4. 尝试解析JSON格式
try:
# print("尝试解析JSON格式")
methods_tried.append("JSON")
# 清理内容移除可能的HTML标签和注释
cleaned_content = re.sub(r'<[^>]+>|/\*.*?\*/|//.*?$', '', content, flags=re.MULTILINE)
# 尝试解析JSON
try:
json_data = json.loads(cleaned_content)
json_nodes = parse_json_nodes(json_data)
if json_nodes:
# print(f"从JSON中提取到{len(json_nodes)}个节点")
nodes.extend(json_nodes)
except json.JSONDecodeError as e:
# 尝试查找内容中的JSON片段
try:
# 查找类似于 [{...}] 或 {...} 形式的JSON
json_matches = re.findall(r'(\[{.*?}\]|\{.*?\})', cleaned_content, re.DOTALL)
for json_match in json_matches:
try:
potential_json = json.loads(json_match)
json_nodes = parse_json_nodes(potential_json)
if json_nodes:
# print(f"从JSON片段中提取到{len(json_nodes)}个节点")
nodes.extend(json_nodes)
# 找到有效的JSON片段后不再继续查找
break
except:
continue
except Exception as extract_error:
# print(f"尝试提取JSON片段失败: {str(extract_error)}")
pass
except Exception as e:
print(f"JSON解析过程出错: {str(e)}")
if len(nodes) > 0:
print(f"通过【{methods_tried[-1]}】方法成功提取到{len(nodes)}个节点")
return nodes
else:
print("未找到任何节点")
return []
def parse_json_nodes(json_data):
"""从JSON数据中解析节点信息"""
nodes = []
# 处理数组形式的JSON
if isinstance(json_data, list):
for item in json_data:
node = parse_single_json_node(item)
if node:
nodes.append(node)
# 处理对象形式的JSON
elif isinstance(json_data, dict):
# 检查是否是单个节点
node = parse_single_json_node(json_data)
if node:
nodes.append(node)
# 检查是否包含节点列表
elif 'servers' in json_data and isinstance(json_data['servers'], list):
for server in json_data['servers']:
node = parse_single_json_node(server)
if node:
nodes.append(node)
# 检查其他可能的字段名
for key in ['proxies', 'nodes', 'configs']:
if key in json_data and isinstance(json_data[key], list):
for item in json_data[key]:
node = parse_single_json_node(item)
if node:
nodes.append(node)
return nodes
def parse_single_json_node(item):
"""解析单个JSON节点数据"""
# 如果不是字典,直接返回
if not isinstance(item, dict):
return None
# 支持Shadowsocks格式
if ('server' in item and 'server_port' in item and
'method' in item and 'password' in item):
try:
return {
'type': 'ss',
'name': item.get('remarks', f"SS-{item['server']}"),
'server': item['server'],
'port': int(item['server_port']),
'cipher': item['method'],
'password': item['password'],
'plugin': item.get('plugin', ''),
'plugin_opts': item.get('plugin_opts', '')
}
except Exception as e:
print(f"解析Shadowsocks节点失败: {str(e)}")
return None
# 支持VMess格式
elif ('add' in item and 'port' in item and 'id' in item):
try:
return {
'type': 'vmess',
'name': item.get('ps', item.get('remarks', f"VMess-{item['add']}")),
'server': item['add'],
'port': int(item['port']),
'uuid': item['id'],
'alterId': int(item.get('aid', 0)),
'cipher': item.get('scy', item.get('security', 'auto')),
'tls': item.get('tls', '') == 'tls',
'network': item.get('net', 'tcp'),
'path': item.get('path', '/'),
'host': item.get('host', '')
}
except Exception as e:
print(f"解析VMess节点失败: {str(e)}")
return None
# 支持Trojan格式
elif ('server' in item and 'port' in item and 'password' in item and
item.get('type', '').lower() == 'trojan'):
try:
return {
'type': 'trojan',
'name': item.get('remarks', f"Trojan-{item['server']}"),
'server': item['server'],
'port': int(item['port']),
'password': item['password'],
'sni': item.get('sni', item.get('peer', ''))
}
except Exception as e:
print(f"解析Trojan节点失败: {str(e)}")
return None
# 支持Clash格式
elif ('type' in item and 'server' in item and 'port' in item):
try:
node_type = item['type'].lower()
if node_type in ['ss', 'vmess', 'trojan', 'vless', 'http', 'socks']:
node = {
'type': node_type,
'name': item.get('name', f"{node_type.upper()}-{item['server']}"),
'server': item['server'],
'port': int(item['port'])
}
# 根据不同类型添加特定字段
if node_type == 'ss':
node['cipher'] = item.get('cipher', 'aes-256-gcm')
node['password'] = item.get('password', '')
elif node_type == 'vmess':
node['uuid'] = item.get('uuid', '')
node['alterId'] = int(item.get('alterId', 0))
node['cipher'] = item.get('cipher', 'auto')
node['tls'] = item.get('tls', False)
node['network'] = item.get('network', 'tcp')
if 'ws-path' in item:
node['path'] = item['ws-path']
elif node_type in ['trojan', 'vless']:
node['password'] = item.get('password', '')
node['sni'] = item.get('sni', '')
return node
except Exception as e:
print(f"解析Clash节点失败: {str(e)}")
return None
return None
def download_xray_core():
"""下载Xray核心程序到当前目录"""
print("正在自动下载Xray核心程序...")
# 检测操作系统类型
is_windows = platform.system() == "Windows"
is_64bit = platform.architecture()[0] == '64bit'
# 获取最新版本的Xray发布信息
try:
api_url = "https://api.github.com/repos/XTLS/Xray-core/releases/latest"
response = requests.get(api_url, timeout=30)
release_info = response.json()
# 确定下载文件名
if is_windows:
if is_64bit:
file_keyword = "windows-64"
else:
file_keyword = "windows-32"
else: # Linux
if is_64bit:
file_keyword = "linux-64"
else:
file_keyword = "linux-32"
# 查找匹配的下载URL
download_url = None
for asset in release_info['assets']:
if file_keyword in asset['name'].lower() and asset['name'].endswith('.zip'):
download_url = asset['browser_download_url']
break
if not download_url:
print(f"未找到适合当前平台({file_keyword})的Xray下载链接")
return False
# 下载Xray
print(f"下载Xray: https://ghproxy.net/{download_url}")
download_response = requests.get(f"https://ghproxy.net/{download_url}", timeout=120)
download_response.raise_for_status()
# 创建目录结构
xray_dir = "./xray-core"
platform_dir = os.path.join(xray_dir, "windows-64" if is_windows else "linux-64")
os.makedirs(platform_dir, exist_ok=True)
# 解压缩文件
with zipfile.ZipFile(io.BytesIO(download_response.content)) as z:
z.extractall(platform_dir)
# 设置执行权限Linux
if not is_windows:
xray_path = os.path.join(platform_dir, "xray")
if os.path.exists(xray_path):
os.chmod(xray_path, 0o755)
print(f"Xray核心程序已下载并解压到 {platform_dir}")
return True
except Exception as e:
print(f"下载Xray失败: {str(e)}")
return False
def find_core_program():
"""查找V2Ray/Xray核心程序如果没有找到则自动下载Xray"""
global CORE_PATH
# 检测操作系统类型
is_windows = platform.system() == "Windows"
# V2Ray可执行文件名
v2ray_exe = "v2ray.exe" if is_windows else "v2ray"
xray_exe = "xray.exe" if is_windows else "xray"
# 首先检查xray-core目录
xray_core_dir = "./xray-core"
platform_dir = "windows-64" if is_windows else "linux-64"
xray_platform_path = os.path.join(xray_core_dir, platform_dir, xray_exe)
# 检查Xray是否存在
if os.path.isfile(xray_platform_path) and os.access(xray_platform_path, os.X_OK if not is_windows else os.F_OK):
CORE_PATH = xray_platform_path
print(f"找到Xray核心程序: {CORE_PATH}")
return CORE_PATH
# 然后检查v2ray-core目录
v2ray_core_dir = "./v2ray-core"
v2ray_platform_path = os.path.join(v2ray_core_dir, platform_dir, v2ray_exe)
# 检查V2Ray是否存在
if os.path.isfile(v2ray_platform_path) and os.access(v2ray_platform_path, os.X_OK if not is_windows else os.F_OK):
CORE_PATH = v2ray_platform_path
print(f"找到V2Ray核心程序: {CORE_PATH}")
return CORE_PATH
# 搜索路径
search_paths = [
".", # 当前目录
"./v2ray", # v2ray子目录
"./xray", # xray子目录
os.path.expanduser("~"), # 用户主目录
]
# Windows特定搜索路径
if is_windows:
search_paths.extend([
"C:\\Program Files\\v2ray",
"C:\\Program Files (x86)\\v2ray",
"C:\\v2ray",
])
# Linux特定搜索路径
else:
search_paths.extend([
"/usr/bin",
"/usr/local/bin",
"/opt/v2ray",
"/opt/xray",
])
# 搜索V2Ray或XRay可执行文件
for path in search_paths:
v2ray_path = os.path.join(path, v2ray_exe)
xray_path = os.path.join(path, xray_exe)
if os.path.isfile(v2ray_path) and os.access(v2ray_path, os.X_OK if not is_windows else os.F_OK):
CORE_PATH = v2ray_path
print(f"找到V2Ray核心程序: {CORE_PATH}")
return CORE_PATH
if os.path.isfile(xray_path) and os.access(xray_path, os.X_OK if not is_windows else os.F_OK):
CORE_PATH = xray_path
print(f"找到XRay核心程序: {CORE_PATH}")
return CORE_PATH
# 如果未找到核心程序自动下载Xray
print("未找到V2Ray或Xray核心程序准备自动下载...")
if download_xray_core():
# 重新检查Xray是否已下载
if os.path.isfile(xray_platform_path) and os.access(xray_platform_path, os.X_OK if not is_windows else os.F_OK):
CORE_PATH = xray_platform_path
print(f"已成功下载并使用Xray核心程序: {CORE_PATH}")
return CORE_PATH
# 如果仍未找到,提示用户手动下载
print("自动下载失败。请访问 https://github.com/XTLS/Xray-core/releases 手动下载并安装")
print("将Xray核心程序放在当前目录或指定系统路径中")
return None
def find_available_port(start_port=10000, end_port=60000):
"""查找可用的端口"""
while True:
port = random.randint(start_port, end_port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.bind(('127.0.0.1', port))
sock.close()
return port
except:
sock.close()
continue
def generate_v2ray_config(node, local_port):
"""根据节点信息生成V2Ray配置文件采用与V2RayN相同的配置方式"""
config = {
"inbounds": [
{
"port": local_port,
"listen": "127.0.0.1",
"protocol": "socks",
"settings": {
"auth": "noauth", # 不需要认证
"udp": True # 支持UDP
},
"sniffing": {
"enabled": True,
"destOverride": ["http", "tls"]
}
}
],
"outbounds": [
# 出站连接将根据节点类型生成
],
"log": {
"loglevel": "none" # 禁止日志输出,减少干扰
}
}
# 根据节点类型配置出站连接参考V2RayN的配置方式
if node['type'] == 'vmess':
# 基本VMess配置
outbound = {
"protocol": "vmess",
"settings": {
"vnext": [
{
"address": node['server'],
"port": node['port'],
"users": [
{
"id": node['uuid'],
"alterId": node.get('alterId', 0),
"security": node.get('cipher', 'auto')
}
]
}
]
},
"streamSettings": {
"network": node.get('network', 'tcp'),
"security": "tls" if node.get('tls', False) else "none"
}
}
# 添加网络特定配置参考V2RayN的配置
if node.get('network') == 'ws':
outbound["streamSettings"]["wsSettings"] = {
"path": node.get('path', '/'),
"headers": {
"Host": node.get('host', node['server'])
}
}
elif node.get('network') == 'h2':
outbound["streamSettings"]["httpSettings"] = {
"path": node.get('path', '/'),
"host": [node.get('host', node['server'])]
}
elif node.get('network') == 'quic':
outbound["streamSettings"]["quicSettings"] = {
"security": node.get('quicSecurity', 'none'),
"key": node.get('quicKey', ''),
"header": {
"type": node.get('headerType', 'none')
}
}
elif node.get('network') == 'grpc':
outbound["streamSettings"]["grpcSettings"] = {
"serviceName": node.get('path', ''),
"multiMode": node.get('multiMode', False)
}
elif node.get('network') == 'tcp':
if node.get('headerType') == 'http':
outbound["streamSettings"]["tcpSettings"] = {
"header": {
"type": "http",
"request": {
"path": [node.get('path', '/')],
"headers": {
"Host": [node.get('host', '')]
}
}
}
}
# TLS相关设置
if node.get('tls'):
outbound["streamSettings"]["tlsSettings"] = {
"serverName": node.get('sni', node.get('host', node['server'])),
"allowInsecure": node.get('allowInsecure', False)
}
config["outbounds"] = [outbound]
elif node['type'] == 'trojan':
# 增强Trojan配置
config["outbounds"] = [{
"protocol": "trojan",
"settings": {
"servers": [
{
"address": node['server'],
"port": node['port'],
"password": node['password']
}
]
},
"streamSettings": {
"network": node.get('network', 'tcp'),
"security": "tls",
"tlsSettings": {
"serverName": node.get('sni', node.get('host', node['server'])),
"allowInsecure": node.get('allowInsecure', False)
}
}
}]
# 添加网络特定配置
if node.get('network') == 'ws':
config["outbounds"][0]["streamSettings"]["wsSettings"] = {
"path": node.get('path', '/'),
"headers": {
"Host": node.get('host', node['server'])
}
}
elif node['type'] == 'vless':
# 增强VLESS配置
config["outbounds"] = [{
"protocol": "vless",
"settings": {
"vnext": [
{
"address": node['server'],
"port": node['port'],
"users": [
{
"id": node['uuid'],
"encryption": "none",
"flow": node.get('flow', '')
}
]
}
]
},
"streamSettings": {
"network": node.get('network', 'tcp'),
"security": "tls" if node.get('tls', False) else "none"
}
}]
# 添加网络特定配置
if node.get('network') == 'ws':
config["outbounds"][0]["streamSettings"]["wsSettings"] = {
"path": node.get('path', '/'),
"headers": {
"Host": node.get('host', node['server'])
}
}
elif node.get('network') == 'grpc':
config["outbounds"][0]["streamSettings"]["grpcSettings"] = {
"serviceName": node.get('path', ''),
"multiMode": node.get('multiMode', False)
}
# TLS相关设置
if node.get('tls'):
config["outbounds"][0]["streamSettings"]["tlsSettings"] = {
"serverName": node.get('sni', node.get('host', node['server'])),
"allowInsecure": node.get('allowInsecure', False)
}
elif node['type'] == 'ss':
# Shadowsocks配置
config["outbounds"] = [{
"protocol": "shadowsocks",
"settings": {
"servers": [
{
"address": node['server'],
"port": node['port'],
"method": node['cipher'],
"password": node['password']
}
]
}
}]
elif node['type'] == 'socks':
# SOCKS配置
outbound = {
"protocol": "socks",
"settings": {
"servers": [
{
"address": node['server'],
"port": node['port']
}
]
}
}
# 如果有用户名和密码,添加到配置中
if node.get('username') and node.get('password'):
outbound["settings"]["servers"][0]["users"] = [
{
"user": node['username'],
"pass": node['password']
}
]
config["outbounds"] = [outbound]
elif node['type'] in ['http', 'https']:
# HTTP/HTTPS配置
outbound = {
"protocol": "http",
"settings": {
"servers": [
{
"address": node['server'],
"port": node['port']
}
]
}
}
# 如果有用户名和密码,添加到配置中
if node.get('username') and node.get('password'):
outbound["settings"]["servers"][0]["users"] = [
{
"user": node['username'],
"pass": node['password']
}
]
config["outbounds"] = [outbound]
else:
# 对于不完全支持的协议,使用简单配置
if DEBUG_MODE:
print(f"警告: 节点类型 {node['type']} 可能不被完全支持,使用基本配置")
return None
return config
def test_node_latency(node):
"""使用核心程序测试节点延迟"""
if not CORE_PATH:
if DEBUG_MODE:
print("未找到核心程序,无法进行延迟测试")
return -1
# 为测试创建临时目录
temp_dir = tempfile.mkdtemp(prefix="node_test_")
config_file = os.path.join(temp_dir, "config.json")
# 获取一个可用端口
local_port = find_available_port()
# 生成配置文件
config = generate_v2ray_config(node, local_port)
if not config:
shutil.rmtree(temp_dir)
return -1
with open(config_file, 'w') as f:
json.dump(config, f)
# 启动核心进程
core_process = None
try:
# 设置代理环境变量使用SOCKS代理
proxies = {
'http': f'socks5://127.0.0.1:{local_port}',
'https': f'socks5://127.0.0.1:{local_port}'
}
# 设置与V2RayN相同的请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2'
}
# 在Windows上使用CREATE_NO_WINDOW标志隐藏控制台窗口
startupinfo = None
if platform.system() == "Windows":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# 启动核心程序
core_process = subprocess.Popen(
[CORE_PATH, "-c", config_file],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
startupinfo=startupinfo
)
# 等待核心程序启动
time.sleep(3)
# 测试连接延迟 - 不再使用重试机制
start_time = time.time()
# 按顺序尝试不同的测试URL
for test_url in TEST_URLS:
try:
if DEBUG_MODE:
print(f"测试节点: {node['name']} - 尝试URL: {test_url}")
response = requests.get(
test_url,
proxies=proxies,
headers=headers,
timeout=CONNECTION_TIMEOUT
)
if response.status_code in [200, 204]:
latency = int((time.time() - start_time) * 1000)
if DEBUG_MODE:
print(f"测试成功: {node['name']} - URL: {test_url} - 延迟: {latency}ms")
return latency
else:
if DEBUG_MODE:
print(f"测试URL状态码错误: {response.status_code}")
except Exception as e:
if DEBUG_MODE:
print(f"测试失败: {test_url} - 错误: {str(e)}")
continue # 尝试下一个URL
# 所有URL测试都失败
if DEBUG_MODE:
print(f"节点 {node['name']} 所有测试URL都失败")
return -1
except Exception as e:
if DEBUG_MODE:
print(f"测试节点 {node['name']} 时发生错误: {str(e)}")
return -1
finally:
# 清理资源
if core_process:
core_process.terminate()
try:
core_process.wait(timeout=5)
except subprocess.TimeoutExpired:
core_process.kill()
# 删除临时目录
try:
shutil.rmtree(temp_dir)
except:
pass
def test_latency(node):
"""测试节点延迟"""
# 必须有核心程序才能进行测试
if not CORE_PATH:
print(f"未找到核心程序,无法测试节点: {node['name']}")
return -1
# 使用核心程序进行精确测试
latency = test_node_latency(node)
return latency
def process_node(node):
"""处理单个节点,添加延迟信息"""
if not node or 'name' not in node or 'server' not in node:
return None
# print(f"测试节点: {node['name']} [{node['type']}] - {node['server']}:{node['port']}")
latency = test_latency(node)
# 过滤掉延迟为0ms或连接失败的节点
if latency <= 0:
# status = "连接失败" if latency == -1 else "延迟为0ms"
# print(f"节点: {node['name']} {status},跳过")
return None
# 更新节点名称,添加延迟信息
node['name'] = f"{node['name']} [{latency}ms]"
print(f"有效节点: {node['name']} ,延迟: {latency}ms")
return node
def remove_duplicates(nodes):
"""去除重复节点"""
unique_nodes = {}
for node in nodes:
try:
key = f"{node['server']}:{node['port']}"
if key not in unique_nodes:
unique_nodes[key] = node
except Exception as e:
# print(f"处理节点 {node['name']} 时出错: {str(e)}")
continue
return list(unique_nodes.values())
def node_to_v2ray_uri(node):
"""将节点信息转换为V2Ray URI格式"""
if node['type'] == 'vmess':
config = {
'v': '2',
'ps': node['name'],
'add': node['server'],
'port': str(node['port']),
'id': node['uuid'],
'aid': str(node['alterId']),
'net': node.get('network', 'tcp'),
'type': node.get('type', 'none'),
'tls': 'tls' if node.get('tls', False) else ''
}
return f"vmess://{base64.b64encode(json.dumps(config).encode()).decode()}"
elif node['type'] == 'trojan':
return f"trojan://{node['password']}@{node['server']}:{node['port']}?sni={node['name']}"
elif node['type'] == 'vless':
# 构建vless uri
query_parts = []
if node.get('tls'):
query_parts.append('security=tls')
if node.get('flow'):
query_parts.append(f"flow={node['flow']}")
if node.get('network'):
query_parts.append(f"type={node['network']}")
query_string = '&'.join(query_parts)
return f"vless://{node['uuid']}@{node['server']}:{node['port']}?{query_string}&remarks={node['name']}"
elif node['type'] == 'ss':
# 构建ss uri
userinfo = f"{node['cipher']}:{node['password']}"
b64_userinfo = base64.b64encode(userinfo.encode()).decode()
return f"ss://{b64_userinfo}@{node['server']}:{node['port']}#{node['name']}"
elif node['type'] == 'ssr':
# 构建ssr uri
password_b64 = base64.b64encode(node['password'].encode()).decode()
name_b64 = base64.b64encode(node['name'].encode()).decode()
ssr_str = f"{node['server']}:{node['port']}:{node['protocol']}:{node['cipher']}:{node['obfs']}:{password_b64}/?remarks={name_b64}"
return f"ssr://{base64.b64encode(ssr_str.encode()).decode()}"
elif node['type'] in ['http', 'https']:
# 构建http/https uri
proto = 'http' if node['type'] == 'http' else 'https'
auth = f"{node['username']}:{node['password']}@" if node['username'] else ""
return f"{proto}://{auth}{node['server']}:{node['port']}?remarks={node['name']}"
elif node['type'] == 'socks':
# 构建socks uri
auth = f"{node['username']}:{node['password']}@" if node['username'] else ""
return f"socks://{auth}{node['server']}:{node['port']}?remarks={node['name']}"
elif node['type'] == 'hysteria':
# 构建hysteria uri
auth = f"{node['auth']}@" if node.get('auth') else ""
protocol_part = f"?protocol={node['protocol']}" if node.get('protocol') else ""
return f"hysteria://{auth}{node['server']}:{node['port']}{protocol_part}&peer={node['name']}"
elif node['type'] == 'wireguard':
# 构建wireguard uri
query_parts = []
if node.get('private_key'):
query_parts.append(f"privateKey={node['private_key']}")
if node.get('public_key'):
query_parts.append(f"publicKey={node['public_key']}")
if node.get('allowed_ips'):
query_parts.append(f"allowedIPs={node['allowed_ips']}")
query_string = '&'.join(query_parts)
return f"wireguard://{node['server']}:{node['port']}?{query_string}&remarks={node['name']}"
return None
def main():
global CORE_PATH
# 查找核心程序
CORE_PATH = find_core_program()
all_nodes = []
# 获取并解析所有订阅
print("\n开始获取节点信息...")
for link in links:
print(f"\n正在处理订阅链接: {link}")
content = fetch_content(link)
if not content:
print("获取失败,跳过该链接")
continue
# 使用新的级联提取函数
nodes = extract_nodes(content)
# print(f"成功提取 {len(nodes)} 个节点")
all_nodes.extend(nodes)
# 节点去重
print(f"去重前节点数量: {len(all_nodes)}")
all_nodes = remove_duplicates(all_nodes)
print(f"去重后节点数量: {len(all_nodes)}")
# 暂时只测试获取节点信息
# return
# 使用线程池并发测试节点延迟
print(f"\n开始测试节点延迟...")
valid_nodes = []
# 限制并发数量,避免资源耗尽
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_TESTS) as executor:
future_to_node = {executor.submit(process_node, node): node for node in all_nodes}
for future in as_completed(future_to_node):
processed_node = future.result()
if processed_node:
valid_nodes.append(processed_node)
print(f"\n测试完成,有效节点数量: {len(valid_nodes)}")
# 收集所有有效节点的URI
valid_uris = []
valid_uri_count = 0
for node in valid_nodes:
uri = node_to_v2ray_uri(node)
if uri:
valid_uris.append(uri)
valid_uri_count += 1
# 将所有URI合并为一个字符串并进行base64编码
if valid_uri_count > 0:
uri_content = '\n'.join(valid_uris)
base64_content = base64.b64encode(uri_content.encode('utf-8')).decode('utf-8')
# 将base64编码后的内容写入文件
with open('v2ray.txt', 'w', encoding='utf-8') as f:
f.write(base64_content)
print(f"\n已将 {valid_uri_count} 个有效节点以base64编码保存到 v2ray.txt 文件")
# 同时保存一个原始文本版本,方便查看
with open('v2ray_raw.txt', 'w', encoding='utf-8') as f:
f.write(uri_content)
print(f"同时保存了原始文本版本到 v2ray_raw.txt 文件")
else:
print("\n未找到有效节点,不生成文件")
if __name__ == '__main__':
main()