parser.py

unlisted ⁨2⁩ ⁨files⁩ 1 year ago

parser.py

Raw
import base64
import re
import yaml
from http.server import BaseHTTPRequestHandler, HTTPServer
import requests
class YAMLModifierHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# Parse query parameters
query_components = self.parse_query_parameters(self.path)
# Extract and decode URLs
source_url = base64.b64decode(query_components.get('source', [''])[0]).decode()
parser_url = base64.b64decode(query_components.get('parser', [''])[0]).decode()
# Fetch and parse YAML files
source_yaml = self.fetch_yaml(source_url)
print(source_yaml)
parser_yaml = self.fetch_yaml(parser_url)
if source_yaml and parser_yaml:
# Modify source YAML using parser YAML
modified_yaml = self.modify_yaml(source_yaml, parser_yaml)
# Send response
self.send_response(200)
self.send_header('Content-type', 'text/yaml')
self.end_headers()
self.wfile.write(
yaml.dump(modified_yaml, allow_unicode=True, default_flow_style=False, sort_keys=False).encode())
else:
# Send error response
self.send_error(404, "Unable to fetch or parse YAML files")
@staticmethod
def parse_query_parameters(path):
from urllib.parse import urlparse, parse_qs
query = urlparse(path).query
return parse_qs(query)
@staticmethod
def fetch_yaml(url):
try:
response = requests.get(url)
response.raise_for_status()
return yaml.safe_load(response.content)
except Exception as e:
print(f"Error fetching YAML from {url}: {e}")
return None
def modify_yaml(self, source_yaml, parser_yaml):
# Apply modifications specified in parser_yaml to source_yaml
for key, operations in parser_yaml.get('yaml', {}).items():
if key.startswith('append-') or key.startswith('prepend-'):
self.modify_list(source_yaml, key, operations)
elif key.startswith('mix-'):
self.mix_object(source_yaml, key, operations)
# Apply commands
for command in parser_yaml.get('yaml', {}).get('commands', []):
self.apply_command(source_yaml, command)
return source_yaml
def modify_list(self, source_yaml, key, operations):
# Determine the target list in the source YAML
list_name = key.split('-', 1)[1]
source_list = source_yaml.get(list_name, [])
# Prepend or append operations
if key.startswith('append-'):
source_list.extend(operations)
elif key.startswith('prepend-'):
source_list = operations + source_list
source_yaml[list_name] = source_list
def mix_object(self, source_yaml, key, operations):
# Determine the target object in the source YAML
object_name = key.split('-', 1)[1]
source_object = source_yaml.get(object_name, {})
# Merge the objects
source_object.update(operations)
source_yaml[object_name] = source_object
def apply_command(self, source_yaml, command):
path, operation, value = self.parse_command(command)
# 定位到 YAML 中的目标位置
target = self.navigate_yaml(source_yaml, path[:-1])
if target is None:
return # 如果路径无效,则跳过此命令
# 处理代理名称匹配的正则表达式
if value.startswith('[]'):
regex_pattern = value[2:]
self.apply_regex_to_proxies(source_yaml, target, regex_pattern)
# 对于正则表达式,需要额外处理
# if value.startswith('[]'):
# regex = re.compile(value[2:])
# matched_proxies = [proxy for proxy in source_yaml['proxies'] if regex.match(proxy)]
# target[path[-1]] = matched_proxies
# else:
# # 执行插入操作
# if operation == '+':
# if isinstance(target, list):
# target.insert(0, value)
# elif isinstance(target, dict):
# target[path[-1]] = value
def navigate_yaml(self, yaml_data, path):
target = yaml_data
for segment in path:
if isinstance(target, list) and segment.isdigit():
segment = int(segment)
if segment < len(target):
target = target[segment]
else:
return None
elif isinstance(target, dict) and segment in target:
target = target[segment]
else:
return None
return target
def apply_regex_to_proxies(self, source_yaml, target_group, regex_pattern):
regex = re.compile(regex_pattern)
matched_proxies = [proxy['name'] for proxy in source_yaml.get('proxies', []) if regex.search(proxy['name'])]
# 将匹配的代理添加到目标策略组
target_group['proxies'].extend(matched_proxies)
def process_value(self, value):
# 如果值是正则表达式或特殊格式,则进行相应的处理
# 例如: []proxyNames|regex
if value.startswith('[]'):
# 处理正则表达式和特殊标记
return value # 返回处理后的值
else:
# 处理普通值
return value
def parse_command(self, command):
# 解析命令格式
match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command)
if match:
path = match.group(1).split('.')
operation = match.group(2)
value = match.group(3)
return path, operation, value
else:
raise ValueError("Invalid command format")
@staticmethod
def parse_command(command):
import re
match = re.match(r'([^.+=-]+(?:\.[^.+=-]+)*)([+=-])(.*)', command)
if match:
path = match.group(1).split('.')
operation = match.group(2)
value = match.group(3)
# Convert value to appropriate type
if value.isdigit():
value = int(value)
elif value in ['true', 'false']:
value = value == 'true'
return path, operation, value
else:
raise ValueError("Invalid command format")
def log_message(self, format, *args):
# Override to prevent logging each request
pass
def run_server(server_class=HTTPServer, handler_class=YAMLModifierHTTPRequestHandler, port=8000):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print(f"Starting HTTP server on port {port}")
httpd.serve_forever()
# Run the server
run_server()

pastefile2

Raw
http://localhost:8000/?source={base64过的URL}&parser={base64过的URL}
source 传入需要修改的 yaml
parser 传入 CFW 格式的 yaml
parser 需要以 yaml 开头,例如:
```
yaml:
prepend-rules:
- DOMAIN-SUFFIX,s.team,🚀 节点选择
- DOMAIN,fanatical.com,🚀 节点选择
- DOMAIN,humblebundle.com,🚀 节点选择
- DOMAIN,playartifact.com,🚀 节点选择
- DOMAIN-SUFFIX,steam-chat.com,🚀 节点选择
- DOMAIN-SUFFIX,steamcommunity.com,🚀 节点选择
- DOMAIN-SUFFIX,steamgames.com,🚀 节点选择
- DOMAIN-SUFFIX,steamstatic.com,🚀 节点选择
- DOMAIN-SUFFIX,steamusercontent.com,🚀 节点选择
- DOMAIN-SUFFIX,underlords.com,🚀 节点选择
- DOMAIN-SUFFIX,valvesoftware.com,🚀 节点选择
- DOMAIN,api.steampowered.com,🚀 节点选择
- DOMAIN,store.steampowered.com,🚀 节点选择
- DOMAIN-SUFFIX,steamserver.net,DIRECT
```