2024-07-21 04:09:17 +02:00
#!/usr/bin/env python3
""" Function calling example using pydantic models. """
2024-07-07 21:04:39 +02:00
from __future__ import annotations
2024-07-21 04:09:17 +02:00
import argparse
2024-01-16 18:41:42 +01:00
import datetime
2024-01-12 20:46:45 +01:00
import json
2024-07-21 04:09:17 +02:00
import logging
import textwrap
import sys
2024-01-12 20:46:45 +01:00
from enum import Enum
2024-01-25 20:51:24 +01:00
from typing import Optional , Union
2024-01-12 20:46:45 +01:00
import requests
from pydantic import BaseModel , Field
2024-01-25 20:51:24 +01:00
from pydantic_models_to_grammar import ( add_run_method_to_dynamic_model , convert_dictionary_to_pydantic_model ,
create_dynamic_model_from_function , generate_gbnf_grammar_and_documentation )
2024-01-16 18:41:42 +01:00
2024-01-12 20:46:45 +01:00
2024-07-21 04:09:17 +02:00
def create_completion ( host , prompt , gbnf_grammar ) :
""" Calls the /completion API on llama-server.
2024-01-12 20:46:45 +01:00
2024-07-21 04:09:17 +02:00
See
https : / / github . com / ggerganov / llama . cpp / tree / HEAD / examples / server #api-endpoints
"""
print ( f " Request: \n Grammar: \n { textwrap . indent ( gbnf_grammar , ' ' ) } \n Prompt: \n { textwrap . indent ( prompt . rstrip ( ) , ' ' ) } " )
headers = { " Content-Type " : " application/json " }
data = { " prompt " : prompt , " grammar " : gbnf_grammar }
result = requests . post ( f " http:// { host } /completion " , headers = headers , json = data ) . json ( )
2024-07-15 01:51:21 +02:00
assert data . get ( " error " ) is None , data
2024-07-21 04:09:17 +02:00
logging . info ( " Result: %s " , result )
content = result [ " content " ]
print ( f " Model: { result [ ' model ' ] } " )
print ( f " Result: \n { textwrap . indent ( json . dumps ( json . loads ( content ) , indent = 2 ) , ' ' ) } " )
return content
2024-01-12 20:46:45 +01:00
# A function for the agent to send a message to the user.
class SendMessageToUser ( BaseModel ) :
2024-07-21 04:09:17 +02:00
""" Send a message to the User. """
2024-01-12 20:46:45 +01:00
chain_of_thought : str = Field ( . . . , description = " Your chain of thought while sending the message. " )
message : str = Field ( . . . , description = " Message you want to send to the user. " )
def run ( self ) :
2024-07-21 04:09:17 +02:00
print ( f " SendMessageToUser: { self . message } " )
def example_rce ( host ) :
""" Minimal test case where the LLM call an arbitrary python function. """
print ( " - example_rce " )
tools = [ SendMessageToUser ]
gbnf_grammar , documentation = generate_gbnf_grammar_and_documentation (
pydantic_model_list = tools , outer_object_name = " function " ,
outer_object_content = " function_parameters " , model_prefix = " Function " , fields_prefix = " Parameters " )
system_message = " You are an advanced AI, tasked to assist the user by calling functions in JSON format. The following are the available functions and their parameters and types: \n \n " + documentation
user_message = " What is 42 * 42? "
prompt = f " <|im_start|>system \n { system_message } <|im_end|> \n <|im_start|>user \n { user_message } <|im_end|> \n <|im_start|>assistant "
text = create_completion ( host , prompt , gbnf_grammar )
json_data = json . loads ( text )
tools_map = { tool . __name__ : tool for tool in tools }
# This finds "SendMessageToUser":
tool = tools_map . get ( json_data [ " function " ] )
if not tool :
print ( f " Error: unknown tool { json_data [ ' function ' ] } " )
return 1
tool ( * * json_data [ " function_parameters " ] ) . run ( )
return 0
2024-01-12 20:46:45 +01:00
2024-01-25 20:51:24 +01:00
# Enum for the calculator tool.
2024-01-12 20:46:45 +01:00
class MathOperation ( Enum ) :
ADD = " add "
SUBTRACT = " subtract "
MULTIPLY = " multiply "
DIVIDE = " divide "
2024-07-21 04:09:17 +02:00
# Simple pydantic calculator tool for the agent that can add, subtract,
# multiply, and divide. Docstring and description of fields will be used in
# system prompt.
2024-01-12 20:46:45 +01:00
class Calculator ( BaseModel ) :
2024-07-21 04:09:17 +02:00
""" Perform a math operation on two numbers. """
2024-01-12 20:46:45 +01:00
number_one : Union [ int , float ] = Field ( . . . , description = " First number. " )
operation : MathOperation = Field ( . . . , description = " Math operation to perform. " )
number_two : Union [ int , float ] = Field ( . . . , description = " Second number. " )
def run ( self ) :
if self . operation == MathOperation . ADD :
return self . number_one + self . number_two
elif self . operation == MathOperation . SUBTRACT :
return self . number_one - self . number_two
elif self . operation == MathOperation . MULTIPLY :
return self . number_one * self . number_two
elif self . operation == MathOperation . DIVIDE :
return self . number_one / self . number_two
else :
raise ValueError ( " Unknown operation. " )
2024-07-21 04:09:17 +02:00
def example_calculator ( host ) :
""" Have the LLM ask to get a calculation done.
2024-01-12 20:46:45 +01:00
2024-07-21 04:09:17 +02:00
Here the grammar gets generated by passing the available function models to
generate_gbnf_grammar_and_documentation function . This also generates a
documentation usable by the LLM .
2024-01-12 20:46:45 +01:00
2024-07-21 04:09:17 +02:00
pydantic_model_list is the list of pydantic models outer_object_name is an
optional name for an outer object around the actual model object . Like a
" function " object with " function_parameters " which contains the actual model
object . If None , no outer object will be generated outer_object_content is
the name of outer object content .
2024-01-12 20:46:45 +01:00
2024-07-21 04:09:17 +02:00
model_prefix is the optional prefix for models in the documentation . ( Default = " Output Model " )
fields_prefix is the prefix for the model fields in the documentation . ( Default = " Output Fields " )
"""
print ( " - example_calculator " )
tools = [ SendMessageToUser , Calculator ]
gbnf_grammar , documentation = generate_gbnf_grammar_and_documentation (
pydantic_model_list = tools , outer_object_name = " function " ,
outer_object_content = " function_parameters " , model_prefix = " Function " , fields_prefix = " Parameters " )
system_message = " You are an advanced AI, tasked to assist the user by calling functions in JSON format. The following are the available functions and their parameters and types: \n \n " + documentation
user_message1 = " What is 42 * 42? "
prompt = f " <|im_start|>system \n { system_message } <|im_end|> \n <|im_start|>user \n { user_message1 } <|im_end|> \n <|im_start|>assistant "
text = create_completion ( host , prompt , gbnf_grammar )
json_data = json . loads ( text )
expected = {
" function " : " Calculator " ,
" function_parameters " : {
" number_one " : 42 ,
" operation " : " multiply " ,
" number_two " : 42
}
}
if json_data != expected :
print ( " Result is not as expected! " )
tools_map = { tool . __name__ : tool for tool in tools }
# This finds "Calculator":
tool = tools_map . get ( json_data [ " function " ] )
if not tool :
print ( f " Error: unknown tool { json_data [ ' function ' ] } " )
return 1
result = tool ( * * json_data [ " function_parameters " ] ) . run ( )
print ( f " Call { json_data [ ' function ' ] } gave result { result } " )
return 0
2024-01-12 20:46:45 +01:00
class Category ( Enum ) :
2024-07-21 04:09:17 +02:00
""" The category of the book. """
2024-01-12 20:46:45 +01:00
Fiction = " Fiction "
NonFiction = " Non-Fiction "
class Book ( BaseModel ) :
2024-07-21 04:09:17 +02:00
""" Represents an entry about a book. """
2024-01-12 20:46:45 +01:00
title : str = Field ( . . . , description = " Title of the book. " )
author : str = Field ( . . . , description = " Author of the book. " )
published_year : Optional [ int ] = Field ( . . . , description = " Publishing year of the book. " )
keywords : list [ str ] = Field ( . . . , description = " A list of keywords. " )
category : Category = Field ( . . . , description = " Category of the book. " )
summary : str = Field ( . . . , description = " Summary of the book. " )
2024-07-21 04:09:17 +02:00
def example_struct ( host ) :
""" A example structured output based on pydantic models.
2024-01-12 20:46:45 +01:00
2024-07-21 04:09:17 +02:00
The LLM will create an entry for a Book database out of an unstructured
text . We need no additional parameters other than our list of pydantic
models .
"""
print ( " - example_struct " )
tools = [ Book ]
gbnf_grammar , documentation = generate_gbnf_grammar_and_documentation ( pydantic_model_list = tools )
system_message = " You are an advanced AI, tasked to create a dataset entry in JSON for a Book. The following is the expected output model: \n \n " + documentation
text = """ The Feynman Lectures on Physics is a physics textbook based on some lectures by Richard Feynman, a Nobel laureate who has sometimes been called " The Great Explainer " . The lectures were presented before undergraduate students at the California Institute of Technology (Caltech), during 1961– 1963. The book ' s co-authors are Feynman, Robert B. Leighton, and Matthew Sands. """
prompt = f " <|im_start|>system \n { system_message } <|im_end|> \n <|im_start|>user \n { text } <|im_end|> \n <|im_start|>assistant "
text = create_completion ( host , prompt , gbnf_grammar )
json_data = json . loads ( text )
# In this case, there's no function nor function_parameters.
# Here the result will vary based on the LLM used.
keys = sorted ( [ " title " , " author " , " published_year " , " keywords " , " category " , " summary " ] )
if keys != sorted ( json_data . keys ( ) ) :
print ( f " Unexpected result: { sorted ( json_data . keys ( ) ) } " )
return 1
book = Book ( * * json_data )
print ( f " As a Book object: %s " % book )
return 0
2024-01-12 20:46:45 +01:00
2024-01-16 18:41:42 +01:00
def get_current_datetime ( output_format : Optional [ str ] = None ) :
2024-07-21 04:09:17 +02:00
""" Get the current date and time in the given format.
2024-01-16 18:41:42 +01:00
Args :
output_format : formatting string for the date and time , defaults to ' % Y- % m- %d % H: % M: % S '
"""
2024-07-21 04:09:17 +02:00
return datetime . datetime . now ( ) . strftime ( output_format or " % Y- % m- %d % H: % M: % S " )
2024-01-16 18:41:42 +01:00
2024-07-21 04:09:17 +02:00
# Example function to get the weather.
2024-01-16 18:41:42 +01:00
def get_current_weather ( location , unit ) :
""" Get the current weather in a given location """
if " London " in location :
return json . dumps ( { " location " : " London " , " temperature " : " 42 " , " unit " : unit . value } )
elif " New York " in location :
return json . dumps ( { " location " : " New York " , " temperature " : " 24 " , " unit " : unit . value } )
elif " North Pole " in location :
return json . dumps ( { " location " : " North Pole " , " temperature " : " -42 " , " unit " : unit . value } )
2024-07-21 04:09:17 +02:00
return json . dumps ( { " location " : location , " temperature " : " unknown " } )
def example_concurrent ( host ) :
""" An example for parallel function calling with a Python function, a pydantic
function model and an OpenAI like function definition .
"""
print ( " - example_concurrent " )
# Function definition in OpenAI style.
current_weather_tool = {
" type " : " function " ,
" function " : {
" name " : " get_current_weather " ,
" description " : " Get the current weather in a given location " ,
" parameters " : {
" type " : " object " ,
" properties " : {
" location " : {
" type " : " string " ,
" description " : " The city and state, e.g. San Francisco, CA " ,
} ,
" unit " : { " type " : " string " , " enum " : [ " celsius " , " fahrenheit " ] } ,
2024-01-16 18:41:42 +01:00
} ,
2024-07-21 04:09:17 +02:00
" required " : [ " location " ] ,
2024-01-16 18:41:42 +01:00
} ,
} ,
2024-07-21 04:09:17 +02:00
}
# Convert OpenAI function definition into pydantic model.
current_weather_tool_model = convert_dictionary_to_pydantic_model ( current_weather_tool )
# Add the actual function to a pydantic model.
current_weather_tool_model = add_run_method_to_dynamic_model ( current_weather_tool_model , get_current_weather )
# Convert normal Python function to a pydantic model.
current_datetime_model = create_dynamic_model_from_function ( get_current_datetime )
tools = [ SendMessageToUser , Calculator , current_datetime_model , current_weather_tool_model ]
gbnf_grammar , documentation = generate_gbnf_grammar_and_documentation (
pydantic_model_list = tools , outer_object_name = " function " ,
outer_object_content = " params " , model_prefix = " Function " , fields_prefix = " Parameters " , list_of_outputs = True )
system_message = " You are an advanced AI assistant. You are interacting with the user and with your environment by calling functions. You call functions by writing JSON objects, which represent specific function calls. \n Below is a list of your available function calls: \n \n " + documentation
text = """ Get the date and time, get the current weather in celsius in London and solve the following calculation: 42 * 42 """
prompt = f " <|im_start|>system \n { system_message } <|im_end|> \n <|im_start|>user \n { text } <|im_end|> \n <|im_start|>assistant "
text = create_completion ( host , prompt , gbnf_grammar )
json_data = json . loads ( text )
expected = [
{
" function " : " get_current_datetime " ,
" params " : {
" output_format " : " % Y- % m- %d % H: % M: % S "
}
} ,
{
" function " : " get_current_weather " ,
" params " : {
" location " : " London " ,
" unit " : " celsius "
}
} ,
{
" function " : " Calculator " ,
" params " : {
" number_one " : 42 ,
" operation " : " multiply " ,
" number_two " : 42
}
}
]
res = 0
if json_data != expected :
print ( " Result is not as expected! " )
print ( " This can happen on highly quantized models " )
res = 1
tools_map = { tool . __name__ : tool for tool in tools }
for call in json_data :
tool = tools_map . get ( call [ " function " ] )
if not tool :
print ( f " Error: unknown tool { call [ ' function ' ] } " )
return 1
result = tool ( * * call [ " params " ] ) . run ( )
print ( f " Call { call [ ' function ' ] } returned { result } " )
# Should output something like this:
# Call get_current_datetime returned 2024-07-15 09:50:38
# Call get_current_weather returned {"location": "London", "temperature": "42", "unit": "celsius"}
# Call Calculator returned 1764
return res
def main ( ) :
parser = argparse . ArgumentParser ( description = sys . modules [ __name__ ] . __doc__ )
parser . add_argument ( " --host " , default = " localhost:8080 " , help = " llama.cpp server " )
parser . add_argument ( " -v " , " --verbose " , action = " store_true " , help = " enables logging " )
args = parser . parse_args ( )
logging . basicConfig ( level = logging . INFO if args . verbose else logging . ERROR )
ret = 0
# Comment out below to only run the example you want.
ret = ret or example_rce ( args . host )
ret = ret or example_calculator ( args . host )
ret = ret or example_struct ( args . host )
ret = ret or example_concurrent ( args . host )
return ret
if __name__ == " __main__ " :
sys . exit ( main ( ) )