Skip to content

Instantly share code, notes, and snippets.

@maulikmadhavi
Created February 1, 2025 02:07
Show Gist options
  • Select an option

  • Save maulikmadhavi/f229c87e3b5c29664d7255ab67857c76 to your computer and use it in GitHub Desktop.

Select an option

Save maulikmadhavi/f229c87e3b5c29664d7255ab67857c76 to your computer and use it in GitHub Desktop.
Calling to GroqAPI and sending multiple queries in async
import os
import requests
import time
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from transformers import AutoTokenizer
# Set your Groq API key
api_key = os.getenv("GROQ_API_KEY")
# Groq API endpoint
url = "https://api.groq.com/openai/v1/chat/completions"
tokenizer = AutoTokenizer.from_pretrained("EleutherAI/pythia-160m")
# List of queries
queries = [
"What is the capital of France?",
"What is 2 + 2?",
"What is the boiling point of water in Celsius?",
"Who wrote 'Romeo and Juliet'?",
"What is the chemical symbol for gold?",
"How many continents are there?",
"What is the largest planet in our solar system?",
"What is the square root of 64?",
"Who painted the 'Mona Lisa'?",
"What is the fastest land animal?",
]
def fetch_response(query):
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
data = {
"model": "llama-3.3-70b-versatile",
"messages": [{"role": "user", "content": query}],
}
# Record the time just before sending the request
send_time = time.strftime("%d-%m-%Y %H-%M-%S-%f")
start_time = time.perf_counter()
# Send the request to the Groq API
response = requests.post(url, headers=headers, json=data)
end_time = time.perf_counter()
# Record the time immediately after receiving the response
receive_time = time.strftime("%d-%m-%Y %H-%M-%S-%f")
# Extract the response text
if response.status_code == 200:
answer = response.json()["choices"][0]["message"]["content"].strip()
else:
answer = f"Error: {response.status_code} - {response.text}"
# Tokenize the generated text to count the number of tokens
token_count = len(tokenizer.tokenize(answer))
total_time = end_time - start_time
token_per_second = token_count / total_time
# Store or log the metrics as needed
print(f"Prompt: {query}")
print(f"Generated Text: {answer}")
print(f"Number of Tokens: {token_count}")
print(f"Total Time Taken: {total_time:.2f} seconds")
print(f"Tokens per Second: {token_per_second:.2f}")
# Create a dictionary with all relevant information
result = {
"query": query,
"answer": answer,
"send_timestamp": send_time,
"receive_timestamp": receive_time,
"total_time": total_time,
"token_count": token_count,
"token_per_second": token_per_second,
}
return result
def main():
results = []
# Use ThreadPoolExecutor to send requests concurrently
with ThreadPoolExecutor(max_workers=10) as executor:
# Create a future for each query
futures = {executor.submit(fetch_response, query): query for query in queries}
# As each future completes, append the result
for future in as_completed(futures):
result = future.result()
results.append(result)
# Sort results to maintain the order of queries
results.sort(key=lambda x: queries.index(x["query"]))
# Convert the results to JSON format
json_output = json.dumps(results, indent=2)
# Print the JSON output
print(json_output)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment