import base64 import re import yaml from http.server import BaseHTTPRequestHandler, HTTPServer import requests class YAMLModifierHTTPRequestHandler(BaseHTTPRequestHandler): def do_GET(self): # Parse query parameters query_components = self.parse_query_parameters(self.path) # Extract and decode URLs source_url = base64.b64decode(query_components.get('source', [''])[0]).decode() parser_url = base64.b64decode(query_components.get('parser', [''])[0]).decode() # Fetch and parse YAML files source_yaml = self.fetch_yaml(source_url) print(source_yaml) parser_yaml = self.fetch_yaml(parser_url) if source_yaml and parser_yaml: # Modify source YAML using parser YAML modified_yaml = self.modify_yaml(source_yaml, parser_yaml) # Send response self.send_response(200) self.send_header('Content-type', 'text/yaml') self.end_headers() self.wfile.write( yaml.dump(modified_yaml, allow_unicode=True, default_flow_style=False, sort_keys=False).encode()) else: # Send error response self.send_error(404, "Unable to fetch or parse YAML files") @staticmethod def parse_query_parameters(path): from urllib.parse import urlparse, parse_qs query = urlparse(path).query return parse_qs(query) @staticmethod def fetch_yaml(url): try: response = requests.get(url) response.raise_for_status() return yaml.safe_load(response.content) except Exception as e: print(f"Error fetching YAML from {url}: {e}") return None def modify_yaml(self, source_yaml, parser_yaml): # Apply modifications specified in parser_yaml to source_yaml for key, operations in parser_yaml.get('yaml', {}).items(): if key.startswith('append-') or key.startswith('prepend-'): self.modify_list(source_yaml, key, operations) elif key.startswith('mix-'): self.mix_object(source_yaml, key, operations) # Apply commands for command in parser_yaml.get('yaml', {}).get('commands', []): self.apply_command(source_yaml, command) return source_yaml def modify_list(self, source_yaml, key, operations): # Determine the target list in the source YAML list_name = key.split('-', 1)[1] source_list = source_yaml.get(list_name, []) # Prepend or append operations if key.startswith('append-'): source_list.extend(operations) elif key.startswith('prepend-'): source_list = operations + source_list source_yaml[list_name] = source_list def mix_object(self, source_yaml, key, operations): # Determine the target object in the source YAML object_name = key.split('-', 1)[1] source_object = source_yaml.get(object_name, {}) # Merge the objects source_object.update(operations) source_yaml[object_name] = source_object def apply_command(self, source_yaml, command): path, operation, value = self.parse_command(command) # 定位到 YAML 中的目标位置 target = self.navigate_yaml(source_yaml, path[:-1]) if target is None: return # 如果路径无效,则跳过此命令 # 处理代理名称匹配的正则表达式 if value.startswith('[]'): regex_pattern = value[2:] self.apply_regex_to_proxies(source_yaml, target, regex_pattern) # 对于正则表达式,需要额外处理 # if value.startswith('[]'): # regex = re.compile(value[2:]) # matched_proxies = [proxy for proxy in source_yaml['proxies'] if regex.match(proxy)] # target[path[-1]] = matched_proxies # else: # # 执行插入操作 # if operation == '+': # if isinstance(target, list): # target.insert(0, value) # elif isinstance(target, dict): # target[path[-1]] = value def navigate_yaml(self, yaml_data, path): target = yaml_data for segment in path: if isinstance(target, list) and segment.isdigit(): segment = int(segment) if segment < len(target): target = target[segment] else: return None elif isinstance(target, dict) and segment in target: target = target[segment] else: return None return target def apply_regex_to_proxies(self, source_yaml, target_group, regex_pattern): regex = re.compile(regex_pattern) matched_proxies = [proxy['name'] for proxy in source_yaml.get('proxies', []) if regex.search(proxy['name'])] # 将匹配的代理添加到目标策略组 target_group['proxies'].extend(matched_proxies) def process_value(self, value): # 如果值是正则表达式或特殊格式,则进行相应的处理 # 例如: []proxyNames|regex if value.startswith('[]'): # 处理正则表达式和特殊标记 return value # 返回处理后的值 else: # 处理普通值 return value def parse_command(self, command): # 解析命令格式 match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command) if match: path = match.group(1).split('.') operation = match.group(2) value = match.group(3) return path, operation, value else: raise ValueError("Invalid command format") @staticmethod def parse_command(command): import re match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command) if match: path = match.group(1).split('.') operation = match.group(2) value = match.group(3) # Convert value to appropriate type if value.isdigit(): value = int(value) elif value in ['true', 'false']: value = value == 'true' return path, operation, value else: raise ValueError("Invalid command format") def log_message(self, format, *args): # Override to prevent logging each request pass def run_server(server_class=HTTPServer, handler_class=YAMLModifierHTTPRequestHandler, port=8000): server_address = ('', port) httpd = server_class(server_address, handler_class) print(f"Starting HTTP server on port {port}") httpd.serve_forever() # Run the server run_server()