| import base64
|
| import re |
| |
| import yaml |
| from http.server import BaseHTTPRequestHandler, HTTPServer |
| import requests |
| |
| |
| class YAMLModifierHTTPRequestHandler(BaseHTTPRequestHandler): |
| def do_GET(self): |
| |
| query_components = self.parse_query_parameters(self.path) |
| |
| |
| source_url = base64.b64decode(query_components.get('source', [''])[0]).decode() |
| parser_url = base64.b64decode(query_components.get('parser', [''])[0]).decode() |
| |
| |
| source_yaml = self.fetch_yaml(source_url) |
| print(source_yaml) |
| parser_yaml = self.fetch_yaml(parser_url) |
| |
| if source_yaml and parser_yaml: |
| |
| modified_yaml = self.modify_yaml(source_yaml, parser_yaml) |
| |
| |
| self.send_response(200) |
| self.send_header('Content-type', 'text/yaml') |
| self.end_headers() |
| self.wfile.write( |
| yaml.dump(modified_yaml, allow_unicode=True, default_flow_style=False, sort_keys=False).encode()) |
| else: |
| |
| self.send_error(404, "Unable to fetch or parse YAML files") |
| |
| @staticmethod |
| def parse_query_parameters(path): |
| from urllib.parse import urlparse, parse_qs |
| query = urlparse(path).query |
| return parse_qs(query) |
| |
| @staticmethod |
| def fetch_yaml(url): |
| try: |
| response = requests.get(url) |
| response.raise_for_status() |
| return yaml.safe_load(response.content) |
| except Exception as e: |
| print(f"Error fetching YAML from {url}: {e}") |
| return None |
| |
| def modify_yaml(self, source_yaml, parser_yaml): |
| |
| for key, operations in parser_yaml.get('yaml', {}).items(): |
| if key.startswith('append-') or key.startswith('prepend-'): |
| self.modify_list(source_yaml, key, operations) |
| elif key.startswith('mix-'): |
| self.mix_object(source_yaml, key, operations) |
| |
| |
| for command in parser_yaml.get('yaml', {}).get('commands', []): |
| self.apply_command(source_yaml, command) |
| |
| return source_yaml |
| |
| def modify_list(self, source_yaml, key, operations): |
| |
| list_name = key.split('-', 1)[1] |
| source_list = source_yaml.get(list_name, []) |
| |
| |
| if key.startswith('append-'): |
| source_list.extend(operations) |
| elif key.startswith('prepend-'): |
| source_list = operations + source_list |
| |
| source_yaml[list_name] = source_list |
| |
| def mix_object(self, source_yaml, key, operations): |
| |
| object_name = key.split('-', 1)[1] |
| source_object = source_yaml.get(object_name, {}) |
| |
| |
| source_object.update(operations) |
| source_yaml[object_name] = source_object |
| |
| def apply_command(self, source_yaml, command): |
| path, operation, value = self.parse_command(command) |
| |
| |
| target = self.navigate_yaml(source_yaml, path[:-1]) |
| if target is None: |
| return |
| |
| |
| if value.startswith('[]'): |
| regex_pattern = value[2:] |
| self.apply_regex_to_proxies(source_yaml, target, regex_pattern) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def navigate_yaml(self, yaml_data, path): |
| target = yaml_data |
| for segment in path: |
| if isinstance(target, list) and segment.isdigit(): |
| segment = int(segment) |
| if segment < len(target): |
| target = target[segment] |
| else: |
| return None |
| elif isinstance(target, dict) and segment in target: |
| target = target[segment] |
| else: |
| return None |
| return target |
| |
| def apply_regex_to_proxies(self, source_yaml, target_group, regex_pattern): |
| regex = re.compile(regex_pattern) |
| matched_proxies = [proxy['name'] for proxy in source_yaml.get('proxies', []) if regex.search(proxy['name'])] |
| |
| |
| target_group['proxies'].extend(matched_proxies) |
| |
| def process_value(self, value): |
| |
| |
| if value.startswith('[]'): |
| |
| return value |
| else: |
| |
| return value |
| |
| def parse_command(self, command): |
| |
| match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command) |
| if match: |
| path = match.group(1).split('.') |
| operation = match.group(2) |
| value = match.group(3) |
| return path, operation, value |
| else: |
| raise ValueError("Invalid command format") |
| |
| @staticmethod |
| def parse_command(command): |
| import re |
| |
| match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command) |
| if match: |
| path = match.group(1).split('.') |
| operation = match.group(2) |
| value = match.group(3) |
| |
| |
| if value.isdigit(): |
| value = int(value) |
| elif value in ['true', 'false']: |
| value = value == 'true' |
| |
| return path, operation, value |
| else: |
| raise ValueError("Invalid command format") |
| |
| def log_message(self, format, *args): |
| |
| pass |
| |
| |
| def run_server(server_class=HTTPServer, handler_class=YAMLModifierHTTPRequestHandler, port=8000): |
| server_address = ('', port) |
| httpd = server_class(server_address, handler_class) |
| print(f"Starting HTTP server on port {port}") |
| httpd.serve_forever() |
| |
| |
| |
| run_server() |
| |