parser.py

unlisted ⁨2⁩ ⁨files⁩ 2023-12-21 12:33:08 UTC

parser.py

Raw
import base64
import re

import yaml
from http.server import BaseHTTPRequestHandler, HTTPServer
import requests


class YAMLModifierHTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # Parse query parameters
        query_components = self.parse_query_parameters(self.path)

        # Extract and decode URLs
        source_url = base64.b64decode(query_components.get('source', [''])[0]).decode()
        parser_url = base64.b64decode(query_components.get('parser', [''])[0]).decode()

        # Fetch and parse YAML files
        source_yaml = self.fetch_yaml(source_url)
        print(source_yaml)
        parser_yaml = self.fetch_yaml(parser_url)

        if source_yaml and parser_yaml:
            # Modify source YAML using parser YAML
            modified_yaml = self.modify_yaml(source_yaml, parser_yaml)

            # Send response
            self.send_response(200)
            self.send_header('Content-type', 'text/yaml')
            self.end_headers()
            self.wfile.write(
                yaml.dump(modified_yaml, allow_unicode=True, default_flow_style=False, sort_keys=False).encode())
        else:
            # Send error response
            self.send_error(404, "Unable to fetch or parse YAML files")

    @staticmethod
    def parse_query_parameters(path):
        from urllib.parse import urlparse, parse_qs
        query = urlparse(path).query
        return parse_qs(query)

    @staticmethod
    def fetch_yaml(url):
        try:
            response = requests.get(url)
            response.raise_for_status()
            return yaml.safe_load(response.content)
        except Exception as e:
            print(f"Error fetching YAML from {url}: {e}")
            return None

    def modify_yaml(self, source_yaml, parser_yaml):
        # Apply modifications specified in parser_yaml to source_yaml
        for key, operations in parser_yaml.get('yaml', {}).items():
            if key.startswith('append-') or key.startswith('prepend-'):
                self.modify_list(source_yaml, key, operations)
            elif key.startswith('mix-'):
                self.mix_object(source_yaml, key, operations)

        # Apply commands
        for command in parser_yaml.get('yaml', {}).get('commands', []):
            self.apply_command(source_yaml, command)

        return source_yaml

    def modify_list(self, source_yaml, key, operations):
        # Determine the target list in the source YAML
        list_name = key.split('-', 1)[1]
        source_list = source_yaml.get(list_name, [])

        # Prepend or append operations
        if key.startswith('append-'):
            source_list.extend(operations)
        elif key.startswith('prepend-'):
            source_list = operations + source_list

        source_yaml[list_name] = source_list

    def mix_object(self, source_yaml, key, operations):
        # Determine the target object in the source YAML
        object_name = key.split('-', 1)[1]
        source_object = source_yaml.get(object_name, {})

        # Merge the objects
        source_object.update(operations)
        source_yaml[object_name] = source_object

    def apply_command(self, source_yaml, command):
        path, operation, value = self.parse_command(command)

        # 定位到 YAML 中的目标位置
        target = self.navigate_yaml(source_yaml, path[:-1])
        if target is None:
            return  # 如果路径无效,则跳过此命令

        # 处理代理名称匹配的正则表达式
        if value.startswith('[]'):
            regex_pattern = value[2:]
            self.apply_regex_to_proxies(source_yaml, target, regex_pattern)

        # 对于正则表达式,需要额外处理
        # if value.startswith('[]'):
        #     regex = re.compile(value[2:])
        #     matched_proxies = [proxy for proxy in source_yaml['proxies'] if regex.match(proxy)]
        #     target[path[-1]] = matched_proxies
        # else:
        #     # 执行插入操作
        #     if operation == '+':
        #         if isinstance(target, list):
        #             target.insert(0, value)
        #         elif isinstance(target, dict):
        #             target[path[-1]] = value





    def navigate_yaml(self, yaml_data, path):
        target = yaml_data
        for segment in path:
            if isinstance(target, list) and segment.isdigit():
                segment = int(segment)
                if segment < len(target):
                    target = target[segment]
                else:
                    return None
            elif isinstance(target, dict) and segment in target:
                target = target[segment]
            else:
                return None
        return target

    def apply_regex_to_proxies(self, source_yaml, target_group, regex_pattern):
        regex = re.compile(regex_pattern)
        matched_proxies = [proxy['name'] for proxy in source_yaml.get('proxies', []) if regex.search(proxy['name'])]

        # 将匹配的代理添加到目标策略组
        target_group['proxies'].extend(matched_proxies)

    def process_value(self, value):
        # 如果值是正则表达式或特殊格式,则进行相应的处理
        # 例如: []proxyNames|regex
        if value.startswith('[]'):
            # 处理正则表达式和特殊标记
            return value  # 返回处理后的值
        else:
            # 处理普通值
            return value

    def parse_command(self, command):
        # 解析命令格式
        match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command)
        if match:
            path = match.group(1).split('.')
            operation = match.group(2)
            value = match.group(3)
            return path, operation, value
        else:
            raise ValueError("Invalid command format")

    @staticmethod
    def parse_command(command):
        import re

        match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command)
        if match:
            path = match.group(1).split('.')
            operation = match.group(2)
            value = match.group(3)

            # Convert value to appropriate type
            if value.isdigit():
                value = int(value)
            elif value in ['true', 'false']:
                value = value == 'true'

            return path, operation, value
        else:
            raise ValueError("Invalid command format")

    def log_message(self, format, *args):
        # Override to prevent logging each request
        pass


def run_server(server_class=HTTPServer, handler_class=YAMLModifierHTTPRequestHandler, port=8000):
    server_address = ('', port)
    httpd = server_class(server_address, handler_class)
    print(f"Starting HTTP server on port {port}")
    httpd.serve_forever()


# Run the server
run_server()

pastefile2

Raw
http://localhost:8000/?source={base64过的URL}&parser={base64过的URL}

source 传入需要修改的 yaml
parser 传入 CFW 格式的 yaml


parser 需要以 yaml 开头,例如:
```
yaml:
  prepend-rules: 
    - DOMAIN-SUFFIX,s.team,🚀 节点选择
    - DOMAIN,fanatical.com,🚀 节点选择
    - DOMAIN,humblebundle.com,🚀 节点选择
    - DOMAIN,playartifact.com,🚀 节点选择
    - DOMAIN-SUFFIX,steam-chat.com,🚀 节点选择
    - DOMAIN-SUFFIX,steamcommunity.com,🚀 节点选择
    - DOMAIN-SUFFIX,steamgames.com,🚀 节点选择
    - DOMAIN-SUFFIX,steamstatic.com,🚀 节点选择
    - DOMAIN-SUFFIX,steamusercontent.com,🚀 节点选择
    - DOMAIN-SUFFIX,underlords.com,🚀 节点选择
    - DOMAIN-SUFFIX,valvesoftware.com,🚀 节点选择
    - DOMAIN,api.steampowered.com,🚀 节点选择
    - DOMAIN,store.steampowered.com,🚀 节点选择
    - DOMAIN-SUFFIX,steamserver.net,DIRECT
```