Skip to content

Instantly share code, notes, and snippets.

@capableguptadotcom
Last active May 23, 2024 07:46
Show Gist options
  • Select an option

  • Save capableguptadotcom/b393bb718a833687fed6df2dc14a4769 to your computer and use it in GitHub Desktop.

Select an option

Save capableguptadotcom/b393bb718a833687fed6df2dc14a4769 to your computer and use it in GitHub Desktop.
Audio File Processing
# Define Node Class
class Node:
def __init__(self, key, value):
self.key = key
self.value = value
self.left = None
self.right = None
# Define Binary Index Tree Class
class BinaryTree:
def __init__(self):
self.root = None
self.last_selection = None
def insert(self, key, value):
self.root = self._insert_recursive(self.root, key, value)
def _insert_recursive(self, node, key, value):
if node is None:
return Node(key, value)
if key < node.key:
node.left = self._insert_recursive(node.left, key, value)
elif key > node.key:
node.right = self._insert_recursive(node.right, key, value)
return node
def traverse(self, user_input):
self.last_selection = self._traverse_recursive(self.root, user_input)
def _traverse_recursive(self, node, user_input):
if node is None:
return None
if node.key == user_input:
return node
if user_input < node.key:
return self._traverse_recursive(node.left, user_input)
else:
return self._traverse_recursive(node.right, user_input)
def get_last_selection(self):
return self.last_selection
# Populate the Tree
tree = BinaryTree()
list_of_dicts = [{'a': 'value1', 'b': 'content1'}, {'a': 'value2', 'c': 'content2'}]
for d in list_of_dicts:
for key, value in d.items():
tree.insert(key, value)
# traverse based on user input
user_input = 'a'
tree.traverse(user_input)
last_selection = tree.get_last_selection()
if last_selection:
print(f"Last selection for key '{user_input}' is '{last_selection.value}'")
else:
print(f"No selection found for key '{user_input}'")
from concurrent.futures import ThreadPoolExecutor
import logging
BATCH_SIZE = 1000
NUM_WORKERS = 10
def create_search_client(index_name):
return SearchClient(
endpoint=SEARCH_SERVICE_ENDPOINT, index_name=index_name, credential=credential
)
def upload_documents_to_index(client, documents):
def upload_batch(batch):
try:
result = client.merge_or_upload_documents(documents=batch)
if result.is_error:
for error in result.errors:
logging.error(
f"Failed to upload document with ID: {error.key}, error: {error.error.message}"
)
else:
logging.info("Successfully uploaded batch")
except Exception as e:
logging.error(f"Failed to upload batch: {e}")
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
for i in range(0, len(documents), BATCH_SIZE):
batch = documents[i : i + BATCH_SIZE]
executor.submit(upload_batch, batch)
index_names = ["dbpedia-1m-baseline", "dbpedia-1m-stored", "dbpedia-1m-scalar-quantization", "dbpedia-1m-both"]
for index_name in index_names:
client = create_search_client(index_name)
upload_documents_to_index(client, transformed_documents
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment