import base64 import re import yaml from http.server import BaseHTTPRequestHandler, HTTPServer import requests class YAMLModifierHTTPRequestHandler(BaseHTTPRequestHandler): def do_GET(self): # 解析查询参数 query_components = self.parse_query_parameters(self.path) # 提取并解码 URL source_url = base64.b64decode(query_components.get('source', [''])[0]).decode() parser_url = base64.b64decode(query_components.get('parser', [''])[0]).decode() # 获取并解析 YAML 文件 source_yaml = self.fetch_yaml(source_url) parser_yaml = self.fetch_yaml(parser_url) if source_yaml and parser_yaml: # 使用 parser_yaml 中的规则修改 source_yaml modified_yaml = self.modify_yaml(source_yaml, parser_yaml) # 发送响应 self.send_response(200) self.send_header('Content-type', 'text/yaml') self.end_headers() self.wfile.write( yaml.dump(modified_yaml, allow_unicode=True, default_flow_style=False, sort_keys=False).encode()) else: # 发送错误响应 self.send_error(404, "Unable to fetch or parse YAML files") @staticmethod def parse_query_parameters(path): # 解析 URL 中的查询参数 from urllib.parse import urlparse, parse_qs query = urlparse(path).query return parse_qs(query) @staticmethod def fetch_yaml(url): # 从给定的 URL 获取并解析 YAML try: response = requests.get(url) response.raise_for_status() return yaml.safe_load(response.content) except Exception as e: print(f"Error fetching YAML from {url}: {e}") return None def modify_yaml(self, source_yaml, parser_yaml): # 根据 parser_yaml 中的规则修改 source_yaml for key, operations in parser_yaml.get('yaml', {}).items(): if key.startswith('append-') or key.startswith('prepend-'): self.modify_list(source_yaml, key, operations) elif key.startswith('mix-'): self.mix_object(source_yaml, key, operations) # 执行 commands for command in parser_yaml.get('yaml', {}).get('commands', []): self.apply_command(source_yaml, command) return source_yaml def modify_list(self, source_yaml, key, operations): # 修改 source_yaml 中的列表 list_name = key.split('-', 1)[1] source_list = source_yaml.get(list_name, []) # Prepend or append operations # 根据操作类型(附加 Prepend 或前置 append)进行修改 if key.startswith('append-'): source_list.extend(operations) elif key.startswith('prepend-'): source_list = operations + source_list source_yaml[list_name] = source_list def mix_object(self, source_yaml, key, operations): # 确定 source_yaml 中的目标对象 object_name = key.split('-', 1)[1] source_object = source_yaml.get(object_name, {}) # 合并对象 source_object.update(operations) source_yaml[object_name] = source_object def apply_command(self, source_yaml, command): path, operation, value = self.parse_command(command) # 查找并处理目标策略组 target_group = self.find_proxy_group(source_yaml, path) if target_group is None: return # 如果没有找到目标策略组,则跳过此命令 if value.startswith('[]'): regex_pattern = value[2:] self.apply_regex_to_proxies(source_yaml, target_group, regex_pattern) if operation == '+': # 插入代理到特定位置 index = int(path[2]) if path[2].isdigit() else None if index is not None and index < len(target_group['proxies']): target_group['proxies'].insert(index, value) else: target_group['proxies'].append(value) # 如果索引无效,则默认添加到末尾 def find_proxy_group(self, source_yaml, path): # 检查路径长度是否足够 if len(path) < 2: return None # 查找指定的策略组 group_name = path[1] for group in source_yaml.get('proxy-groups', []): if group['name'] == group_name: return group return None def apply_regex_to_proxies(self, source_yaml, target_group, regex_pattern): # 使用正则表达式筛选代理 regex = re.compile(regex_pattern) matched_proxies = [proxy['name'] for proxy in source_yaml.get('proxies', []) if regex.search(proxy['name'])] # 将匹配的代理添加到目标策略组 target_group['proxies'].extend(matched_proxies) def parse_command(self, command): # 首先尝试以 '+' 为分隔符分割命令 if '+' in command: parts = command.split('+', 1) operation = '+' # 如果没有 '+', 则尝试以 '=' 为分隔符 elif '=' in command: parts = command.split('=', 1) operation = '=' else: raise ValueError("Invalid command format") if len(parts) != 2: raise ValueError("Invalid command format") path = parts[0].split('.') value = parts[1] return path, operation, value def log_message(self, format, *args): # 覆盖以避免记录每个请求 pass def run_server(server_class=HTTPServer, handler_class=YAMLModifierHTTPRequestHandler, port=8000): server_address = ('', port) httpd = server_class(server_address, handler_class) print(f"Starting HTTP server on port {port}") httpd.serve_forever() if __name__ == '__main__': run_server()