Last active
September 6, 2022 19:30
-
-
Save magniff/0496b99a685d24b47ae96a781d2403ca to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Callable, Generic, TypeVar, cast, NoReturn, Optional, Any | |
| I = TypeVar("I") | |
| O = TypeVar("O") | |
| R = TypeVar("R") | |
| T = TypeVar("T") | |
| @dataclass | |
| class State: | |
| index: int = 0 | |
| line: int = 0 | |
| col: int = 0 | |
| class ParseResult(Generic[I]): | |
| def unwrap(self) -> I: | |
| return NotImplemented | |
| ParserSignature = Callable[[str, State], ParseResult[O]] | |
| @dataclass | |
| class ParseResultOk(ParseResult[I]): | |
| output: I | |
| state: State | |
| def unwrap(self) -> I: | |
| return self.output | |
| @dataclass | |
| class ParseResultFail(ParseResult[I]): | |
| state: State | |
| def unwrap(self) -> NoReturn: | |
| raise ValueError( | |
| f"Cant parse, stuck at line:{self.state.line} and col:{self.state.col}" | |
| ) | |
| @dataclass | |
| class Parser(Generic[O]): | |
| parser_function: ParserSignature[O] | |
| def __rshift__(self, then: Callable[[O], Parser[R]]) -> Parser[R]: | |
| def parser(stream: str, state: State) -> ParseResult[R]: | |
| if isinstance(self_result := self(stream, state), ParseResultOk): | |
| return then(self_result.output)(stream, self_result.state) | |
| else: | |
| return cast(ParseResultFail[R], self_result) | |
| return Parser(parser) | |
| def __add__(self, then: Parser[R]) -> Parser[R]: | |
| return self >> const(then) | |
| def __getitem__(self, func: Callable[[O], R]) -> Parser[R]: | |
| return self >> (lambda self_result: success(func(self_result))) | |
| def __or__(self, other: Parser[O]) -> Parser[O]: | |
| def parser(stream: str, state: State) -> ParseResult[O]: | |
| if isinstance(self_result := self(stream, state), ParseResultOk): | |
| return self_result | |
| else: | |
| return other(stream, state) | |
| return Parser(parser) | |
| def __call__(self, stream: str, state: State | None = None) -> ParseResult[O]: | |
| return self.parser_function(stream, state if state is not None else State()) | |
| def maybe(self: Parser[O]) -> Parser[Optional[O]]: | |
| return Parser( | |
| lambda stream, state: ( | |
| result | |
| if isinstance(result := self(stream, state), ParseResultOk) | |
| else ParseResultOk(output=None, state=state) | |
| ) | |
| ) | |
| def success(value: R) -> Parser[R]: | |
| return Parser(lambda _, state: ParseResultOk(state=state, output=value)) | |
| def fail() -> Parser[None]: | |
| return Parser(lambda _, state: ParseResultFail(state=state)) | |
| zero_or_more: Callable[[Parser[I]], Parser[list[I]]] = lambda p: p.maybe() >> ( | |
| lambda result: success([]) | |
| if result is None | |
| else (zero_or_more(p) >> (lambda rec_result: success([result, *rec_result]))) # type: ignore | |
| ) | |
| one_or_more: Callable[[Parser[I]], Parser[list[I]]] = lambda p: p >> ( | |
| lambda first_result: zero_or_more(p) | |
| >> (lambda more_results: success([first_result, *more_results])) | |
| ) | |
| def end_of_file() -> Parser[str]: | |
| def parser(stream: str, state: State) -> ParseResult[str]: | |
| if len(stream) == state.index: | |
| return ParseResultOk(state=state, output="") | |
| else: | |
| return ParseResultFail(state=state) | |
| return Parser(parser) | |
| def predicate(test: Callable[[str], bool]) -> Parser[str]: | |
| def parser(stream: str, state: State) -> ParseResult[str]: | |
| if len(stream) > state.index and test(current_char := stream[state.index]): | |
| if current_char == "\n": | |
| new_line_number = state.line + 1 | |
| new_col_number = 0 | |
| else: | |
| new_line_number = state.line | |
| new_col_number = state.col + 1 | |
| return ParseResultOk( | |
| state=State( | |
| index=state.index + 1, line=new_line_number, col=new_col_number | |
| ), | |
| output=stream[state.index], | |
| ) | |
| else: | |
| return ParseResultFail(state=state) | |
| return Parser(parser) | |
| const = lambda parser: lambda _: parser | |
| single_letter = lambda symbol_to_match: predicate( | |
| lambda current_sym: current_sym == symbol_to_match | |
| ) | |
| string: Parser[str] = single_letter('"') >> const( | |
| zero_or_more(predicate(lambda value: value != '"'))["".join] | |
| >> (lambda string_contents: single_letter('"') >> const(success(string_contents))) | |
| ) | |
| keyword: Callable[[str], Parser[None]] = ( | |
| lambda keyword_string: success(None) | |
| if len(keyword_string) == 0 | |
| else single_letter(keyword_string[0]) >> const(keyword(keyword_string[1:])) | |
| ) | |
| spaces = zero_or_more(predicate(str.isspace)) | |
| json_object: Parser[list[Any] | bool | str | dict[str, Any] | float | None] = ( | |
| (boolean_object := (keyword("true")[const(True)] | keyword("false")[const(False)])) | |
| | ( | |
| string := ( | |
| single_letter('"') | |
| >> const( | |
| zero_or_more(predicate(lambda value: value != '"'))["".join] | |
| >> ( | |
| lambda string_contents: single_letter('"') | |
| >> const(success(string_contents)) | |
| ) | |
| ) | |
| ) | |
| ) | |
| | ( | |
| number := ( | |
| predicate(lambda value: str.isdigit(value) and value != "0") | |
| >> ( | |
| lambda first_digit: zero_or_more( | |
| predicate(str.isdigit) >> (lambda digit: success(digit)) | |
| ) | |
| >> (lambda digits: success(float("".join([first_digit] + digits)))) | |
| ) | |
| ) | |
| ) | |
| | (null := keyword("null")[const(None)]) | |
| | ( | |
| json_list := ( | |
| lambda object_parser_lazy: (single_letter("[") + spaces) | |
| >> ( | |
| lambda _: zero_or_more( | |
| object_parser_lazy() | |
| >> ( | |
| lambda parsed_json_object: (single_letter(",") + spaces) | |
| >> const(success(parsed_json_object)) | |
| ) | |
| ) | |
| >> ( | |
| lambda list_of_objects: single_letter("]") | |
| >> const(success(list_of_objects)) | |
| ) | |
| ) | |
| )(lambda: json_object) | |
| ) | |
| | ( | |
| json_dict := ( | |
| lambda object_parser_lazy: (single_letter("{") + spaces) | |
| >> ( | |
| lambda _: zero_or_more( | |
| string | |
| >> ( | |
| lambda parsed_string: (spaces + single_letter(":") + spaces) | |
| >> const( | |
| object_parser_lazy() | |
| >> ( | |
| lambda parsed_object_value: ( | |
| spaces + single_letter(",") + spaces | |
| ) | |
| >> const(success((parsed_string, parsed_object_value))) | |
| ) | |
| ) | |
| ) | |
| )[dict] | |
| >> ( | |
| lambda object_dict: single_letter("}") | |
| >> const(success(object_dict)) | |
| ) | |
| ) | |
| )(lambda: json_object) | |
| ) | |
| ) | |
| json_string = """{ | |
| "first_name": "John", | |
| "second_name": "Smith", | |
| "age": 42, | |
| "hobbies": null, | |
| "is_married": true, | |
| "kids": [], | |
| "address": { | |
| "city": "Boston", | |
| "street": "Memorial Drive", | |
| }, | |
| }""" | |
| json_object_whole_file = json_object >> ( | |
| lambda obj: end_of_file() >> (lambda _: success(obj)) | |
| ) | |
| assert json_object_whole_file(stream=json_string).unwrap() == { | |
| "first_name": "John", | |
| "second_name": "Smith", | |
| "age": 42, | |
| "hobbies": None, | |
| "is_married": True, | |
| "kids": [], | |
| "address": { | |
| "city": "Boston", | |
| "street": "Memorial Drive", | |
| }, | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment