Skip to content

Instantly share code, notes, and snippets.

@Maximilian-Winter
Last active November 17, 2024 06:13
Show Gist options
  • Select an option

  • Save Maximilian-Winter/5373962ef456a2b0d1ae324fb78e623e to your computer and use it in GitHub Desktop.

Select an option

Save Maximilian-Winter/5373962ef456a2b0d1ae324fb78e623e to your computer and use it in GitHub Desktop.
GBNF grammar generator for always valid function calls and object creation in JSON with llama.cpp
import inspect
import re
import typing
from inspect import isclass, getdoc
from pydantic import BaseModel, Field
from pydantic.fields import FieldInfo
from typing import Any, Type, List, get_args, get_origin, Tuple, Union, Optional
from enum import Enum
import re
class PydanticDataType(Enum):
STRING = "string"
BOOLEAN = "boolean"
INTEGER = "integer"
FLOAT = "float"
OBJECT = "object"
ARRAY = "array"
ENUM = "enum"
CUSTOM_CLASS = "custom-class"
def map_pydantic_type_to_gbnf(pydantic_type: Type[Any]) -> str:
if isclass(pydantic_type) and issubclass(pydantic_type, str):
return PydanticDataType.STRING.value
elif isclass(pydantic_type) and issubclass(pydantic_type, bool):
return PydanticDataType.BOOLEAN.value
elif isclass(pydantic_type) and issubclass(pydantic_type, int):
return PydanticDataType.INTEGER.value
elif isclass(pydantic_type) and issubclass(pydantic_type, float):
return PydanticDataType.FLOAT.value
elif isclass(pydantic_type) and issubclass(pydantic_type, Enum):
return PydanticDataType.ENUM.value
elif isclass(pydantic_type) and issubclass(pydantic_type, BaseModel):
return format_model_and_field_name(pydantic_type.__name__)
elif get_origin(pydantic_type) == list:
element_type = get_args(pydantic_type)[0]
return f"{map_pydantic_type_to_gbnf(element_type)}-list"
elif get_origin(pydantic_type) == Union:
union_types = get_args(pydantic_type)
union_rules = [map_pydantic_type_to_gbnf(ut) for ut in union_types]
return f"union-{'-or-'.join(union_rules)}"
elif get_origin(pydantic_type) == Optional:
element_type = get_args(pydantic_type)[0]
return f"optional-{map_pydantic_type_to_gbnf(element_type)}"
elif isclass(pydantic_type):
return f"{PydanticDataType.CUSTOM_CLASS.value}-{format_model_and_field_name(pydantic_type.__name__)}"
elif get_origin(pydantic_type) == dict:
key_type, value_type = get_args(pydantic_type)
return f"custom-dict-key-type-{format_model_and_field_name(map_pydantic_type_to_gbnf(key_type))}-value-type-{format_model_and_field_name(map_pydantic_type_to_gbnf(value_type))}"
else:
return "unknown"
def format_model_and_field_name(model_name: str) -> str:
parts = re.findall('[A-Z][^A-Z]*', model_name)
if not parts: # Check if the list is empty
return model_name.lower().replace("_", "-")
return '-'.join(part.lower().replace("_", "-") for part in parts)
def generate_list_rule(element_type):
"""
Generate a GBNF rule for a list of a given element type.
:param element_type: The type of the elements in the list (e.g., 'string').
:return: A string representing the GBNF rule for a list of the given type.
"""
rule_name = f"{map_pydantic_type_to_gbnf(element_type)}-list"
element_rule = map_pydantic_type_to_gbnf(element_type)
list_rule = f"{rule_name} ::= \"[\" ws ( {element_rule} (\",\" ws {element_rule})* )? \"]\""
return list_rule
def get_members_structure(cls, rule_name):
if issubclass(cls, Enum):
# Handle Enum types
members = [f'\"\\\"{member.value}\\\"\"' for name, member in cls.__members__.items()]
return f"{cls.__name__.lower()} ::= " + " | ".join(members)
if cls.__annotations__ and cls.__annotations__ != {}:
result = f'{rule_name} ::= "{{"'
type_list_rules = []
# Modify this comprehension
members = [f' ws \"\\\"{name}\\\"\" ws ":" ws {map_pydantic_type_to_gbnf(param_type)}'
for name, param_type in cls.__annotations__.items()
if name != 'self']
result += '", "'.join(members)
result += ' ws "}"'
return result, type_list_rules
else:
init_signature = inspect.signature(cls.__init__)
parameters = init_signature.parameters
result = f'{cls.__name__.lower()} ::= "{{"'
type_list_rules = []
# Modify this comprehension too
members = [f' ws \"\\\"{name}\\\"\" ws ":" ws {map_pydantic_type_to_gbnf(param.annotation)}'
for name, param in parameters.items()
if name != 'self' and param.annotation != inspect.Parameter.empty]
result += '", "'.join(members)
result += ' ws "}"'
return result, type_list_rules
def regex_to_gbnf(regex_pattern: str) -> str:
"""
Translate a basic regex pattern to a GBNF rule.
Note: This function handles only a subset of simple regex patterns.
"""
gbnf_rule = regex_pattern
# Translate common regex components to GBNF
gbnf_rule = gbnf_rule.replace('\\d', '[0-9]')
gbnf_rule = gbnf_rule.replace('\\s', '[ \t\n]')
# Handle quantifiers and other regex syntax that is similar in GBNF
# (e.g., '*', '+', '?', character classes)
return gbnf_rule
def generate_gbnf_rule_for_type(model_name, field_name, field_type, is_optional, processed_models, field_info=None) -> \
Tuple[str, list]:
rules = []
field_name = format_model_and_field_name(field_name)
gbnf_type = map_pydantic_type_to_gbnf(field_type)
if isclass(field_type) and issubclass(field_type, BaseModel):
nested_model_name = format_model_and_field_name(field_type.__name__)
nested_model_rules = generate_gbnf_grammar(field_type, processed_models)
rules.extend(nested_model_rules)
gbnf_type, rules = nested_model_name, rules
elif isclass(field_type) and issubclass(field_type, Enum):
enum_values = [f'\"\\\"{e.value}\\\"\"' for e in field_type] # Adding escaped quotes
enum_rule = f"{model_name}-{field_name} ::= {' | '.join(enum_values)}"
rules.append(enum_rule)
gbnf_type, rules = model_name + "-" + field_name, rules
elif get_origin(field_type) == list: # Array
element_type = get_args(field_type)[0]
element_rule_name, additional_rules = generate_gbnf_rule_for_type(model_name, f"{field_name}-element",
element_type, is_optional, processed_models)
rules.extend(additional_rules)
array_rule = f"""{model_name}-{field_name} ::= "[" ws {element_rule_name} ("," ws {element_rule_name})* ws "]" """
rules.append(array_rule)
gbnf_type, rules = model_name + "-" + field_name, rules
elif gbnf_type.startswith("custom-class-"):
nested_model_rules, field_types = get_members_structure(field_type, gbnf_type)
rules.append(nested_model_rules)
elif gbnf_type.startswith("custom-dict-"):
key_type, value_type = get_args(field_type)
additional_key_type, additional_key_rules = generate_gbnf_rule_for_type(model_name, f"{field_name}-key-type",
key_type, is_optional, processed_models)
additional_value_type, additional_value_rules = generate_gbnf_rule_for_type(model_name,
f"{field_name}-value-type",
value_type, is_optional,
processed_models)
gbnf_type = fr'{gbnf_type} ::= "{{" ws ( {additional_key_type} ":" ws {additional_value_type} ("," ws {additional_key_type} ":" ws {additional_value_type})* )? "}}" ws'
rules.extend(additional_key_rules)
rules.extend(additional_value_rules)
elif gbnf_type.startswith("union-"):
union_types = get_args(field_type)
union_rules = []
for union_type in union_types:
union_gbnf_type, union_rules_list = generate_gbnf_rule_for_type(model_name, field_name, union_type, False,
processed_models)
union_rules.append(union_gbnf_type)
rules.extend(union_rules_list)
# Defining the union grammar rule separately
union_grammar_rule = f"{model_name}-{field_name}-union ::= {' | '.join(union_rules)}"
rules.append(union_grammar_rule)
# Referencing the union rule in the main model rule
gbnf_type = f"{model_name}-{field_name}-union"
elif isclass(field_type) and issubclass(field_type, str):
if field_info and hasattr(field_info, 'pattern'):
# Convert regex pattern to grammar rule
regex_pattern = field_info.regex.pattern
gbnf_type = f"pattern-{field_name} ::= {regex_to_gbnf(regex_pattern)}"
else:
gbnf_type = PydanticDataType.STRING.value
else:
gbnf_type, rules = gbnf_type, []
if is_optional:
gbnf_type += ")?"
gbnf_type = "(" + gbnf_type
return gbnf_type, rules
def generate_gbnf_grammar(model: Type[BaseModel], processed_models: set) -> list:
if model in processed_models:
return []
processed_models.add(model)
model_name = format_model_and_field_name(model.__name__)
model_fields = {}
if not issubclass(model, BaseModel):
# For non-Pydantic classes, generate model_fields from __annotations__ or __init__
if hasattr(model, '__annotations__') and model.__annotations__:
model_fields = {name: (typ, ...) for name, typ in model.__annotations__.items()}
else:
init_signature = inspect.signature(model.__init__)
parameters = init_signature.parameters
model_fields = {name: (param.annotation, param.default) for name, param in parameters.items()
if name != 'self'}
else:
# For Pydantic models, use model_fields and check for ellipsis (required fields)
model_fields = model.__annotations__
model_rule_parts = []
nested_rules = []
for field_name, field_info in model_fields.items():
if not issubclass(model, BaseModel):
field_type, default_value = field_info
# Check if the field is optional (not required)
is_optional = (default_value is not inspect.Parameter.empty) and (default_value is not Ellipsis)
else:
field_type = field_info
field_info = model.model_fields[field_name]
is_optional = field_info.is_required is False and get_origin(field_type) is Optional
rule_name, additional_rules = generate_gbnf_rule_for_type(model_name, field_name, field_type, is_optional,
processed_models, field_info)
model_rule_parts.append(f'\"\\\"{field_name}\\\"\" ":" ws {rule_name}') # Adding escaped quotes
nested_rules.extend(additional_rules)
fields_joined = ' ws ", " ws '.join(model_rule_parts)
model_rule = f'{model_name} ::= "{{" ws {fields_joined} ws "}}"'
all_rules = [model_rule] + nested_rules
return all_rules
def generate_gbnf_grammar_from_pydantic(models: List[Type[BaseModel]], root_rule_class: str = None,
root_rule_content: str = None) -> str:
processed_models = set()
all_rules = []
if root_rule_class is None:
for model in models:
model_rules = generate_gbnf_grammar(model, processed_models)
all_rules.extend(model_rules)
root_rule = "root ::= " + " | ".join([format_model_and_field_name(model.__name__) for model in models])
all_rules.insert(0, root_rule)
return "\n".join(all_rules)
elif root_rule_class is not None:
root_rule = f"root ::= {format_model_and_field_name(root_rule_class)}\n"
model_rule = fr'{format_model_and_field_name(root_rule_class)} ::= "{{" ws "\"{root_rule_class}\"" ":" ws grammar-models ws "}}"'
fields_joined = " | ".join(
[fr'{format_model_and_field_name(model.__name__)}-grammar-model' for model in models])
grammar_model_rules = f'\ngrammar-models ::= {fields_joined}'
mod_rules = []
for model in models:
mod_rule = fr'{format_model_and_field_name(model.__name__)}-grammar-model ::= '
mod_rule += fr'"\"{model.__name__}\"" "," "\"{root_rule_content}\"" ":" {format_model_and_field_name(model.__name__)}' + '\n'
mod_rules.append(mod_rule)
grammar_model_rules += "\n" + "\n".join(mod_rules)
for model in models:
model_rules = generate_gbnf_grammar(model, processed_models)
all_rules.extend(model_rules)
all_rules.insert(0, root_rule + model_rule + grammar_model_rules)
return "\n".join(all_rules)
def get_primitive_grammar():
type_list = [str, bool, float, int]
additional_grammar = [generate_list_rule(t) for t in type_list]
primitive_grammar = r"""
boolean ::= "true" | "false"
string ::= "\"" ( [^"\\] | "\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) )* "\"" ws
float ::= ("-"? ([0-9] | [1-9] [0-9]*)) ("." [0-9]+)? ([eE] [-+]? [0-9]+)? ws
ws ::= ""
integer ::= [0-9]+"""
return "\n" + '\n'.join(additional_grammar) + primitive_grammar
def generate_field_markdown(field_name: str, field_type: Type[Any], model: Type[BaseModel], depth=1) -> str:
indent = ' ' * depth
field_markdown = f"{indent}- **{field_name}** (`{field_type.__name__}`): "
# Extracting field description from Pydantic Field using __model_fields__
field_info = model.model_fields.get(field_name)
field_description = field_info.description if field_info and field_info.description else "No description available."
field_markdown += field_description + '\n'
# Handling nested BaseModel fields
if isclass(field_type) and issubclass(field_type, BaseModel):
field_markdown += f"{indent} - Details:\n"
for name, type_ in field_type.__annotations__.items():
field_markdown += generate_field_markdown(name, type_, field_type, depth + 2)
return field_markdown
def generate_markdown_report(pydantic_models: List[Type[BaseModel]]) -> str:
markdown = ""
for model in pydantic_models:
markdown += f"## {model.__name__}\n"
class_doc = getdoc(model) or "No description available."
markdown += f"{class_doc}\n\n"
markdown += "### Fields\n"
if isclass(model) and issubclass(model, BaseModel):
for name, field_type in model.__annotations__.items():
markdown += generate_field_markdown(name, field_type, model)
markdown += "\n"
return markdown
def save_gbnf_grammar_and_documentation(grammar, documentation, grammar_file_path="./grammar.gbnf",
documentation_file_path="./grammar_documentation.md"):
try:
with open(grammar_file_path, 'w') as file:
file.write(grammar + get_primitive_grammar())
print(f"Grammar successfully saved to {grammar_file_path}")
except IOError as e:
print(f"An error occurred while saving the grammar file: {e}")
try:
with open(documentation_file_path, 'w') as file:
file.write(documentation)
print(f"Documentation successfully saved to {documentation_file_path}")
except IOError as e:
print(f"An error occurred while saving the documentation file: {e}")
def remove_empty_lines(string):
lines = string.splitlines()
non_empty_lines = [line for line in lines if line.strip() != ""]
string_no_empty_lines = "\n".join(non_empty_lines)
return string_no_empty_lines
def generate_and_save_gbnf_grammar_and_documentation(pydantic_model_list, grammar_file_path="./generated_grammar.gbnf",
documentation_file_path="./generated_grammar_documentation.md",
root_rule_class: str = None, root_rule_content: str = None):
documentation = generate_markdown_report(pydantic_model_list)
grammar = generate_gbnf_grammar_from_pydantic(pydantic_model_list, root_rule_class, root_rule_content)
grammar = remove_empty_lines(grammar)
print(grammar)
save_gbnf_grammar_and_documentation(grammar, documentation, grammar_file_path, documentation_file_path)
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
class Department(Enum):
"""Enum for department names."""
HR = 'Human Resources'
IT = 'Information Technology'
SALES = 'Sales'
MARKETING = 'Marketing'
class SkillSet:
"""Skillset of the employee."""
primary_skill: str = Field(..., description="Primary skill of the employee.")
secondary_skills: List[str] = Field(..., description="List of secondary skills.")
class ComplexEmployeeModel:
"""Detailed employee model."""
employee_id: int
name: str = Field(..., description="Name of the employee.")
department: Department = Field(..., description="Department of the employee.")
skill_set: SkillSet = Field(..., description="Skillset of the employee.")
experience_years: float = Field(..., description="Years of experience.")
is_full_time: bool = Field(True, description="Is the employee full-time.")
# Cmd Command Model
class CmdCommandModel(BaseModel):
"""
A model for executing CMD commands within a Large Language Model environment.
It captures the user's inner thoughts during command formulation and supports
function chaining through a heartbeat mechanism.
"""
inner_thoughts: str = Field(..., description="Your inner thoughts or inner monologue while writing the command.")
command: str = Field(..., description="The CMD command to execute.")
require_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Web Browsing Model
class WebBrowsingModel(BaseModel):
"""
A model designed for handling web browsing operations in a Large Language Model context.
It accommodates the user's thought process in crafting the URL and includes a mechanism
for sequential control through a heartbeat feature.
"""
inner_thoughts: str = Field(..., description="Your inner thoughts or inner monologue while writing the url.")
URL: str = Field(..., description="The URL you want to access.")
require_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Web Download Model
class WebDownloadModel(BaseModel):
"""
A model for managing web content downloads in a Large Language Model setting.
It captures the user's considerations in selecting the URL and download path,
and supports chained execution via a heartbeat mechanism.
"""
inner_thoughts: str = Field(..., description="Your inner thoughts or inner monologue while writing the url.")
URL: str = Field(..., description="The URL you want to download.")
Path: str = Field(..., description="The Path you want to download the file to.")
require_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Python Interpreter Command Model
class PythonInterpreterCommandModel(BaseModel):
"""
A model for executing Python commands in a Large Language Model framework.
It incorporates the user's thought process during command creation and enables
sequential task execution with a heartbeat mechanism.
"""
inner_thoughts: str = Field(..., description="Your inner thoughts or inner monologue while writing the command.")
command: str = Field(..., description="The Python command to execute.")
require_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Write File Section Model
class WriteFileSectionModel(BaseModel):
"""
A model to facilitate writing to specific sections of a file in a Large Language Model.
It includes detailed reasoning for the writing process and supports chaining of write operations
with a heartbeat feature.
"""
chain_of_thought: str = Field(...,
description="Detailed, step-by-step reasoning for the actions to be performed, ensuring clarity in the task execution process.")
folder: str = Field(...,
description="Path to the folder where the file is located or will be created. It should be a valid directory path.")
file_name: str = Field(...,
description="Name of the target file (excluding the file extension) where the section will be written or modified.")
file_extension: str = Field(...,
description="File extension indicating the file type, such as '.txt', '.py', '.md', etc.")
section: str = Field(...,
description="The specific section within the file to be targeted, such as a class, method, or a uniquely identified section.")
body: str = Field(...,
description="The actual content to be written into the specified section. It can be code, text, or data in a format compatible with the file type.")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Read File Model
class ReadFileModel(BaseModel):
"""
A model dedicated to reading file contents in a Large Language Model environment.
It enables specifying file location and supports sequential reading tasks through a heartbeat mechanism.
"""
folder: str = Field(None, description="Path to the folder containing the file.")
file_name: str = Field(...,
description="The name of the file to be read, including its extension (e.g., 'document.txt').")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# File List Model
class FileListModel(BaseModel):
"""
A model for listing files within a specified directory in a Large Language Model setup.
It also allows for chained file listing operations enabled by a heartbeat feature.
"""
folder: str = Field(...,
description="Path to the directory where files will be listed. This path can include subdirectories to be scanned.")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
class AddCoreMemoryModel(BaseModel):
"""
A model designed to add entries to the core memory of a Large Language Model.
It facilitates the storage of key-value pairs and supports sequential memory operations with a heartbeat mechanism.
"""
key: str = Field(..., description="The key identifier for the core memory entry.")
field: str = Field(..., description="A secondary key or field within the core memory entry.")
value: str = Field(..., description="The value or data to be stored in the specified core memory entry.")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Replace Core Memory Model
class ReplaceCoreMemoryModel(BaseModel):
"""
A model for replacing specific fields in the core memory of a Large Language Model.
It allows updating of existing memory entries and includes a heartbeat feature for chained memory operations.
"""
key: str = Field(..., description="The key identifier for the core memory entry.")
field: str = Field(..., description="The specific field within the core memory entry to be replaced.")
new_value: str = Field(...,
description="The new value to replace the existing data in the specified core memory field.")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Remove Core Memory Model
class RemoveCoreMemoryModel(BaseModel):
"""
A model to remove entries or specific fields from the core memory in a Large Language Model.
It supports sequential memory modification tasks through a heartbeat mechanism.
"""
key: str = Field(..., description="The key identifier for the core memory entry to be removed.")
field: str = Field(..., description="The specific field within the core memory entry to be removed.")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Defining the RolesEnum
class RolesEnum(str, Enum):
EVENT_MEMORY_SEARCH = "Event-Memory-Search"
KNOWLEDGE_MEMORY_SEARCH = "Knowledge-Memory-Search"
MESSAGE_FROM_DEEP_THOUGHT = "Message-From-Deep-Thought"
MESSAGE_FROM_USER = "Message-From-User"
SYSTEM_MESSAGE = "System-Message"
# Search Event Memory Model
class SearchEventMemoryModel(BaseModel):
"""
A model for searching event memories in a Large Language Model.
It allows filtering by event types, date range, and content keywords, with a heartbeat feature for continuous search operations.
"""
event_types: List[RolesEnum] = Field(..., description="Array of event types to filter the search.")
start_date: str = Field(..., description="The starting date for the event search range.")
end_date: str = Field(..., description="The ending date for the event search range.")
content_keywords: List[str] = Field(..., description="Array of keywords to search within the event content.")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Search Knowledge Model
class SearchKnowledgeModel(BaseModel):
"""
A model tailored for querying knowledge memories in a Large Language Model framework.
It supports extended search operations enabled by a heartbeat mechanism.
"""
query: str = Field(..., description="The query string to search within the 'Knowledge-Memory'.")
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Connect Knowledge Memories Model
class ConnectKnowledgeMemoriesModel(BaseModel):
"""
A model for connecting various knowledge memories in a Large Language Model.
It enables linking different knowledge bases and supports chained operations through a heartbeat feature.
"""
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# Self Reflect Model
class SelfReflectModel(BaseModel):
"""
A model to enable self-reflection capabilities in a Large Language Model.
It supports introspective operations and continuous self-analysis through a heartbeat mechanism.
"""
request_heartbeat: bool = Field(...,
description="Set this to true to get control back after execution, to chain functions together.")
# generate_and_save_gbnf_grammar_and_documentation(
# [PythonInterpreterCommandModel, WebBrowsingModel, WebDownloadModel], root_rule_class="function",
# root_rule_content="function")
generate_and_save_gbnf_grammar_and_documentation(
[CmdCommandModel, WebBrowsingModel, PythonInterpreterCommandModel, WriteFileSectionModel, ReadFileModel,
FileListModel, AddCoreMemoryModel, ReplaceCoreMemoryModel, RemoveCoreMemoryModel, SearchEventMemoryModel,
SearchKnowledgeModel, ConnectKnowledgeMemoriesModel, SelfReflectModel], root_rule_class="function",
root_rule_content="function")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment