Skip to content

Instantly share code, notes, and snippets.

@aaronpolhamus
Created March 19, 2025 18:35
Show Gist options
  • Select an option

  • Save aaronpolhamus/c47efe359f4c0a049ce1180cd569054b to your computer and use it in GitHub Desktop.

Select an option

Save aaronpolhamus/c47efe359f4c0a049ce1180cd569054b to your computer and use it in GitHub Desktop.

Revisions

  1. aaronpolhamus created this gist Mar 19, 2025.
    105 changes: 105 additions & 0 deletions conversation_monitoring_test.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,105 @@
    import os

    import gspread
    import multiprocessing
    import pandas as pd
    import re
    from joblib import Parallel, delayed
    from sqlalchemy import text

    from analytics.airflow.dags import default_args
    from analytics.etl.dbs import get_vest_bi_prod_engine
    from analytics.etl.google_drive.utils import (
    download_sheets_file,
    save_df_to_sheet,
    save_string_to_drive,
    )
    from analytics.etl.intercom.transcript_maker import (
    detect_cx_interaction,
    detect_pure_bot_convo,
    make_convo_transcript,
    )
    from analytics.llms.utils import get_llm_response, get_vestai_token

    DW_REPORT_CREDENTIALS = "credentials/northbound-prod-eed6cf87604b.json"
    COMPLAINT_MAIN_DIRECTORY_ID = "1LKV5RAqMNwP4pYIK8XEkn8vpqkzyV5ps"
    ORIGINAL_TRANSCRIPTS_FOLDER = "1lXKU6xG8JUpg4vIe0-HlKgVamhqXA6xr"
    TRANSLATED_TRANSCRIPTS_FOLDER = "19OgEhsxuuI_MAeF4nnyCfvZXNfD8R6-8"
    CUSTOMER_COMPLAINT_MASTER_REPORT = "1edND6ARfgCctGpvIxJIYRQC1zHJ39x3esiS5qo0ObX0"
    KEYWORDS_FILE_ID = "102gzQvpZI6mgBh8hHFgpY7Y2HYdVP4B-dk-AWBTi-aQ"
    CUSTOMER_ID = "vest_compliance_bot"

    api_host = os.getenv("VEST_API_HOST")
    rest_base_url = f"https://{api_host}"

    token = get_vestai_token(rest_base_url, CUSTOMER_ID)

    def get_updated_conversations(start_date):
    return pd.read_sql(
    text(
    """
    WITH tags_array AS (
    SELECT
    convo_id,
    array_agg(tag->>'name') AS tag_names
    FROM intercom_convos,
    jsonb_array_elements(tags->'tags') AS tag
    GROUP BY convo_id
    )
    SELECT
    ic.convo_id,
    ta.tag_names,
    ic.state,
    ic.updated_at,
    ic.conversation_parts->>'total_count' AS total_interactions,
    ic.conversation_parts,
    ic.tags,
    ai.account_number,
    u.first_name || ' ' || u.last_name AS user_name,
    u.user_email_address,
    u.user_telephone_number,
    u.country_abbv
    FROM intercom_convos ic
    JOIN tags_array ta ON ic.convo_id = ta.convo_id
    LEFT JOIN link_table lt ON lt.user_id = ic.contacts->'contacts'->0->>'external_id'
    LEFT JOIN account_info ai ON lt.account_number = ai.account_number
    LEFT JOIN users u ON ai.user_id = u.user_id
    WHERE ic.updated_at >= :start_date;
    """,
    ),
    get_vest_bi_prod_engine(),
    params={"start_date": start_date},
    )

    # Get last update date from master report
    master_df = download_sheets_file(
    DW_REPORT_CREDENTIALS,
    CUSTOMER_COMPLAINT_MASTER_REPORT,
    )
    master_df["updated_at"] = pd.to_datetime(master_df["updated_at"])
    last_update = master_df["updated_at"].max()

    # Get new conversations since last update
    df = get_updated_conversations(last_update)

    # Filter for live CX interactions
    df["bots_only"] = df["conversation_parts"].apply(
    lambda x: detect_pure_bot_convo(x["conversation_parts"]),
    )
    df["live_cx_interaction"] = df["conversation_parts"].apply(
    lambda x: detect_cx_interaction(x["conversation_parts"]),
    )
    df = df[df["live_cx_interaction"]]

    # Generate transcripts
    df["transcript"] = df.apply(make_convo_transcript, axis=1)

    _transcript = df.iloc[0]["transcript"]

    _message = {"role": "user", "content": _transcript}

    payload = {
    "service_bot_type": "conversation_monitoring",
    "message": _message
    }
    json_response = get_llm_response(base_url=rest_base_url, payload=payload, token=token)