""" LinkSyncServer - Query Parser for Expression Parser """ import re from typing import Union, Dict, List, Any from enum import Enum class TokenType(Enum): OPERATOR = "OPERATOR" TERM = "TERM" VALUE = "VALUE" LPAREN = "LPAREN" RPAREN = "RPAREN" class Token: def __init__(self, token_type: TokenType, value: Any, line: int = 0, column: int = 0): self.type = token_type self.value = value self.line = line self.column = column def __repr__(self): return f"Token({self.type.value}, {self.value!r})" class QuerySyntaxError(Exception): """Syntax error in query expression.""" def __init__(self, message: str, line: int = None, column: int = None): self.message = message self.line = line self.column = column super().__init__(f"{message} at line {line}, column {column}" if line and column else message) def lex(expression: str) -> List[Token]: """ Lexical analysis - convert string to tokens. Grammar: expression := query_item (OP query_item)* query_item := (expression) | value | term term := OP | value value := url:value | tag:value | title:value | description:value | id:value """ tokens = [] pos = 0 # Operators operators = ['AND', 'OR', 'XOR'] while pos < len(expression): # Skip whitespace if expression[pos].isspace(): pos += 1 continue # Check for parentheses if expression[pos] == '(': tokens.append(Token(TokenType.LPAREN, '(')) pos += 1 continue if expression[pos] == ')': tokens.append(Token(TokenType.RPAREN, ')')) pos += 1 continue # Check for operators (AND, OR, XOR) if expression[pos:pos+4] == 'AND': tokens.append(Token(TokenType.OPERATOR, 'AND')) pos += 4 continue if expression[pos:pos+3] == 'OR': tokens.append(Token(TokenType.OPERATOR, 'OR')) pos += 3 continue if expression[pos:pos+4] == 'XOR': tokens.append(Token(TokenType.OPERATOR, 'XOR')) pos += 4 continue # Check for url: prefix if expression[pos:pos+4] == 'url:': pos += 4 # Find end of URL end = expression.find(':', pos) if end == -1 and expression[pos] == '://': # Find end of URL (next space or end of string) end = expression.find(' ', pos) if end == -1: end = len(expression) tokens.append(Token(TokenType.TERM, expression[pos:end])) pos = end continue # Check for tag: prefix if expression[pos:pos+5] == 'tag:': pos += 5 end = expression.find(':', pos) if end == -1: end = len(expression) tokens.append(Token(TokenType.TERM, expression[pos:end])) pos = end continue # Check for title: or description: prefixes if expression[pos:pos+6] in ['title:', 'description:']: field = 'title' if expression[pos:pos+6] == 'title:' else 'description' pos += 6 end = expression.find(':', pos) if end == -1 and expression[pos] == ':' : end = len(expression) tokens.append(Token(TokenType.TERM, expression[pos:end])) pos = end continue # Check for colon (key:value) if expression[pos] == ':': pos += 1 # Get field name (key) field = expression[pos] pos += 1 # Get value end = expression.find(' ', pos) if end == -1: end = len(expression) token_val = expression[pos:end].strip('"\'') tokens.append(Token(TokenType.VALUE, f'{field}:{token_val}')) continue # Regular term - alphanumeric if expression[pos].isalnum() or expression[pos] in '-_': start = pos while pos < len(expression) and (expression[pos].isalnum() or expression[pos] in '-_./?=?&'): pos += 1 tokens.append(Token(TokenType.TERM, expression[start:pos])) continue # Unknown character - skip or error pos += 1 return tokens class ASTNode: """Abstract Syntax Tree Node.""" def __init__(self, operator: str, children: List[Union[ASTNode, str, dict]] = None): self.operator = operator self.children = children if children else [] def __repr__(self): return f"AST({self.operator}, {self.children})" def parse_operator(token: Token) -> str: """Convert operator token to Python operator string.""" if token.type != TokenType.OPERATOR: raise QuerySyntaxError(f"Expected operator, got {token.value}") if token.value == 'AND': return 'and' elif token.value == 'OR': return 'or' elif token.value == 'XOR': return 'xor' else: raise QuerySyntaxError(f"Unknown operator: {token.value}") class QueryParser: """Parser for query expressions.""" def __init__(self): self.tokens = [] self.pos = 0 self.current_token = None self.error = False def error(self, message: str): """Record and return error.""" self.error = True return QuerySyntaxError(message) def parse_expression(self) -> List[ASTNode]: """Parse top-level expression (list of clauses).""" if not self.tokens: return [] expressions = [] # Parse first clause expr = self.parse_or() if expr: expressions.append(expr) # Parse remaining clauses while self.current_token and self.current_token.value in ['AND', 'OR', 'XOR']: operator = self.current_token.value self.pos += 1 expressions.append(operator) expr2 = self.parse_or() if expr2: expressions.append(expr2) return expressions def parse_or(self) -> Union[ASTNode, None]: """Parse OR clause.""" if not self.current_token: return None return self.parse_and() def parse_and(self) -> Union[ASTNode, None]: """Parse AND clause.""" left = self.parse_xor() while self.current_token and self.current_token.value == 'OR': operator = self.parse_operator(self.current_token) right = self.parse_xor() left = ASTNode(operator, [left, right]) return left def parse_xor(self) -> Union[ASTNode, None]: """Parse XOR clause.""" left = self.parse_term() while self.current_token and self.current_token.value == 'AND': operator = self.parse_operator(self.current_token) right = self.parse_term() left = ASTNode(operator, [left, right]) return left def parse_term(self): """Parse term.""" if self.error: return None if self.pos >= len(self.tokens): return None token = self.current_token # Check for parentheses (subexpression) if token and token.value == '(': self.pos += 1 self.current_token = self.tokens[self.pos] if self.pos < len(self.tokens) else None sub_expr = self.parse_expression() if not sub_expr and not self.error: return None if self.error: return None if self.current_token and self.current_token.value == ')': self.pos += 1 return sub_expr elif token and token.value != ')': return token def parse_value(self) -> Union[None, str]: """Parse value term.""" if self.error: return None token = self.current_token if not token or token.type != TokenType.TERM: return None # Extract URL, TAG, etc. term = token.value # Check for url: value if term.startswith('url:'): query = {'operation': 'TERM', 'value': term[4:]} self.pos += 1 self.current_token = self.tokens[self.pos] if self.pos < len(self.tokens) else None return query elif term.startswith('tag:'): query = {'operation': 'TERM', 'value': term[4:]} self.pos += 1 self.current_token = self.tokens[self.pos] if self.pos < len(self.tokens) else None return query elif term.startswith('title:'): query = {'operation': 'TERM', 'value': term[6:]} self.pos += 1 self.current_token = self.tokens[self.pos] if self.pos < len(self.tokens) else None return query elif term.startswith('description:'): query = {'operation': 'TERM', 'value': term[12:]} self.pos += 1 self.current_token = self.tokens[self.pos] if self.pos < len(self.tokens) else None return query elif term.startswith('id:'): query = {'operation': 'EQUALS', 'value': term[3:]} self.pos += 1 self.current_token = self.tokens[self.pos] if self.pos < len(self.tokens) else None return query elif term.startswith('"') or term.startswith("'"): # Direct value return term else: self.error(f"Unknown term: {term}") return None def parse(self, expression: str) -> List[ASTNode]: """Parse complete expression.""" if not expression: return [] # Check for empty expression if not expression.strip(): return [] # Lexical analysis self.tokens = lex(expression) self.pos = 0 self.current_token = self.tokens[0] if self.tokens else None if not self.tokens: return [] # Parse expression into AST expr = self.parse_expression() # Return AST as dict return [self.ast_to_dict(node) for node in expr] if expr else [] def ast_to_dict(self, node, indent=0): """Convert AST node to dict representation.""" if isinstance(node, ASTNode): if node.children: return { "operation": node.operator, "operands": [self.ast_to_dict(child, indent + 1) for child in node.children] } else: return node.value elif isinstance(node, str): return node elif isinstance(node, dict): return node else: return str(node)