""" LinkSyncServer - Query Parser for Expression Parser """ from enum import Enum from typing import Any, Dict, List, Optional class TokenType(Enum): OPERATOR = "OPERATOR" TERM = "TERM" FIELD = "FIELD" LPAREN = "LPAREN" RPAREN = "RPAREN" COLON = "COLON" COMMA = "COMMA" class Token: def __init__(self, token_type: TokenType, value: Any, line: int = 0, column: int = 0): self.type = token_type self.value = value self.line = line self.column = column def __repr__(self): return f"Token({self.type.value}, {self.value!r})" class QuerySyntaxError(Exception): def __init__(self, message: str, line: int = None, column: int = None): self.message = message self.line = line self.column = column if line and column: super().__init__(f"{message} at line {line}, column {column}") else: super().__init__(message) def lex(expression: str) -> List[Token]: tokens = [] pos = 0 line = 1 column = 1 while pos < len(expression): ch = expression[pos] if ch in " \t": pos += 1 column += 1 continue if ch == "\n": line += 1 column = 1 pos += 1 continue if ch == "(": tokens.append(Token(TokenType.LPAREN, "(", line, column)) pos += 1 column += 1 continue if ch == ")": tokens.append(Token(TokenType.RPAREN, ")", line, column)) pos += 1 column += 1 continue if ch == ",": tokens.append(Token(TokenType.COMMA, ",", line, column)) pos += 1 column += 1 continue if expression[pos:].startswith("AND"): tokens.append(Token(TokenType.OPERATOR, "AND", line, column)) pos += 3 column += 3 continue if expression[pos:].startswith("OR"): tokens.append(Token(TokenType.OPERATOR, "OR", line, column)) pos += 2 column += 2 continue if expression[pos:].startswith("XOR"): tokens.append(Token(TokenType.OPERATOR, "XOR", line, column)) pos += 3 column += 3 continue if ch in ("'", '"'): quote = ch pos += 1 column += 1 start = pos while pos < len(expression) and expression[pos] != quote: pos += 1 value = expression[start:pos] tokens.append(Token(TokenType.TERM, value, line, column)) pos += 1 column += len(value) + 1 continue if ch.isalnum() or ch in "-_.": start = pos start_col = column while pos < len(expression) and (expression[pos].isalnum() or expression[pos] in "-_.:/?&=%"): pos += 1 value = expression[start:pos] if ":" in value: field, _, field_value = value.partition(":") if field in ("url", "tag", "title", "description", "path", "id"): tokens.append(Token(TokenType.FIELD, field.upper(), line, start_col)) tokens.append(Token(TokenType.TERM, field_value, line, start_col + len(field) + 1)) column += pos - start continue tokens.append(Token(TokenType.TERM, value, line, start_col)) column += pos - start continue pos += 1 column += 1 return tokens class ASTNode: def __init__(self, node_type: str, value: Any = None, children: Optional[List["ASTNode"]] = None): self.node_type = node_type self.value = value self.children = children or [] def to_dict(self) -> Dict[str, Any]: if self.children: return { "operation": self.node_type, "operands": [child.to_dict() for child in self.children], } if self.value is not None: return {"operation": self.node_type, "value": self.value} return {"operation": self.node_type} def __repr__(self): return f"ASTNode({self.node_type}, {self.value!r}, {self.children})" class QueryParser: def __init__(self): self.tokens: List[Token] = [] self.pos: int = 0 def _current(self) -> Optional[Token]: if self.pos < len(self.tokens): return self.tokens[self.pos] return None def _advance(self) -> Optional[Token]: token = self._current() self.pos += 1 return token def _expect(self, token_type: TokenType, value: str = None) -> Token: token = self._current() if token is None: raise QuerySyntaxError(f"Expected {token_type.value}, got end of input") if token.type != token_type: raise QuerySyntaxError(f"Expected {token_type.value}, got {token.type.value}") if value is not None and token.value != value: raise QuerySyntaxError(f"Expected '{value}', got '{token.value}'") return self._advance() def parse(self, expression: str) -> Optional[Dict[str, Any]]: if not expression or not expression.strip(): return None self.tokens = lex(expression) self.pos = 0 if not self.tokens: return None node = self._parse_or() if self._current() is not None: raise QuerySyntaxError(f"Unexpected token: {self._current().value}") return node.to_dict() if node else None def _parse_or(self) -> ASTNode: left = self._parse_and() while self._current() and self._current().type == TokenType.OPERATOR and self._current().value == "OR": self._advance() right = self._parse_and() left = ASTNode("OR", children=[left, right]) return left def _parse_and(self) -> ASTNode: left = self._parse_xor() while self._current() and self._current().type == TokenType.OPERATOR and self._current().value == "AND": self._advance() right = self._parse_xor() left = ASTNode("AND", children=[left, right]) return left def _parse_xor(self) -> ASTNode: left = self._parse_primary() while self._current() and self._current().type == TokenType.OPERATOR and self._current().value == "XOR": self._advance() right = self._parse_primary() left = ASTNode("XOR", children=[left, right]) return left def _parse_primary(self) -> ASTNode: token = self._current() if token is None: raise QuerySyntaxError("Unexpected end of input") if token.type == TokenType.LPAREN: self._advance() node = self._parse_or() self._expect(TokenType.RPAREN) return node if token.type == TokenType.FIELD: field_token = self._advance() value_token = self._current() if value_token and value_token.type == TokenType.TERM: self._advance() return ASTNode(f"FIELD:{field_token.value}", value=value_token.value) return ASTNode(f"FIELD:{field_token.value}", value="") if token.type == TokenType.TERM: self._advance() return self._parse_term(token) raise QuerySyntaxError(f"Unexpected token: {token.value}") def _parse_term(self, token: Token) -> ASTNode: next_token = self._current() if next_token and next_token.type == TokenType.COMMA: terms = [token.value] while self._current() and self._current().type == TokenType.COMMA: self._advance() term_token = self._current() if term_token and term_token.type == TokenType.TERM: terms.append(term_token.value) self._advance() return ASTNode("TERM_SET", value=terms) return ASTNode("TERM", value=token.value)