Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save Thuruv/6b81540d96ff49d8dec87bfaff0e2096 to your computer and use it in GitHub Desktop.

Select an option

Save Thuruv/6b81540d96ff49d8dec87bfaff0e2096 to your computer and use it in GitHub Desktop.

Revisions

  1. @oldmonkABA oldmonkABA renamed this gist Apr 6, 2020. 1 changed file with 0 additions and 0 deletions.
  2. @oldmonkABA oldmonkABA created this gist Apr 6, 2020.
    261 changes: 261 additions & 0 deletions automatic-login-kiteconnect.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,261 @@
    # -*- coding: utf-8 -*-
    import json
    import os
    import logmatic
    import logging
    from logging.config import dictConfig
    import requests
    from http import HTTPStatus
    import re
    from kiteconnect import KiteConnect
    import os
    import datetime

    __author__ = "oldmonkABA"

    # TODO: These can be passed in as command line arguments. Make this change later!

    # Name of file containing configuration information for the logging framework
    LOG_FILE_NAME = "logging.json"

    # Name of file containing KITE connection information
    CONNECTION_INFO_FILE_NAME = "connection_info.json"

    # Logger for this module
    LOGGER = logging.getLogger(__name__)


    def init_logging():
    """
    Initialize the logging framework with the configuration
    :return: Nothing
    :rtype: None
    """
    here = os.path.dirname(__file__)

    log_fp = os.path.join(here, LOG_FILE_NAME)
    if not os.path.exists(log_fp):
    raise FileNotFoundError("Logging configuration file not found!")

    with open(log_fp, mode="r") as log_config:
    dictConfig(config=json.load(log_config))

    LOGGER.info("Logging framework initialized!")


    def load_kite_config():
    """
    Load the kite configuration information from the json file
    :return: A python dictionary
    :rtype: dict
    """
    # Get the directory where this python script is being executed
    here = os.path.dirname(__file__)

    connect_fp = os.path.join(here, CONNECTION_INFO_FILE_NAME)

    if not os.path.exists(connect_fp):
    raise FileNotFoundError("Connection information file not found!")

    with open(connect_fp, mode="r") as connect_file:
    config = json.load(connect_file)

    LOGGER.info("Kite connection information loaded successfully from file!")

    return config

    def need_to_generate_token(file):
    flag = False
    here = os.path.dirname(__file__)

    fp = os.path.join(here, file)
    login_time=''
    if os.path.isfile(fp):
    print(fp+" present in pwd")
    with open(fp, 'r') as f:
    data = json.load(f)
    LOGGER.info("Previous login time for "+data["user_name"]+" is "+str(data["login_time"]))
    login_time = datetime.datetime.strptime(data["login_time"],"%Y-%m-%d %H:%M:%S")
    today = datetime.datetime.now()
    cut_off_time = datetime.datetime(today.year,today.month,today.day,7,00,00)
    if login_time > cut_off_time:
    LOGGER.info("Acces token is fresh")
    else:
    os.remove(fp)
    flag=True
    LOGGER.info(file+" has been deleted.")
    else:
    LOGGER.info(file+" does not exist in pwd")
    flag = True
    return flag,login_time

    def kite_prelogin(config, http_session):
    """
    Perform pre-login into kite
    :param config: The python dictionary containing kite configuration information
    :type config: dict
    :param http_session: A http session
    :type http_session: :class:`requests.Session`
    :return: The response referer url
    :rtype: str
    """
    url = config["LOGIN_REFERER"] + config["KITE_API_KEY"]

    response = http_session.get(url=url)

    return response.url


    def login_kite(config, http_session):
    """
    Perform a login
    :param config: The python dictionary containing kite configuration information
    :type config: dict
    :param http_session: A http session
    :type http_session: :class:`requests.Session`
    :return: The response payload as a python dictionary
    :rtype: dict
    """

    url = config["LOGIN_URL"]
    data = dict()
    data["user_id"] = config["USER"]
    data["password"] = config["PASSWORD"]
    response = http_session.post(url=url, data=data)
    print(response.content)
    # Deserialize the response content
    resp_dict = json.loads(response.content)

    if "message" in resp_dict.keys():
    # Since logging framework already expects message as a parameter change the key
    resp_dict["err_message"] = resp_dict["message"]
    del resp_dict["message"]

    if response.status_code != HTTPStatus.OK:
    LOGGER.error("Login failure", extra=resp_dict)
    raise ConnectionError("Login failed!")

    return resp_dict


    def kite_twofa(login_resp, config, http_session):
    """
    Perform kite-two-factor authentication
    :param login_resp: The response payload from the primary user login
    :type login_resp: dict
    :param config: The python dictionary containing kite configuration information
    :type config: dict
    :param http_session: A http session
    :type http_session: :class:`requests.Session`
    :return: The response payload as a python dictionary
    :rtype: dict
    """
    url = config["TWOFA_URL"]
    data = dict()
    data["user_id"] = config["USER"]
    data["request_id"] = login_resp["data"]["request_id"]
    data["twofa_value"] = str(config["PIN"])

    response = http_session.post(url=url, data=data)

    # Deserialize the response content
    resp_dict = json.loads(response.content)


    if "message" in resp_dict.keys():
    # Since logging framework already expects message as a parameter change the key
    resp_dict["err_message"] = resp_dict["message"]
    del resp_dict["message"]

    if response.status_code != HTTPStatus.OK:
    LOGGER.error("Two-factor authentication failure", extra=resp_dict)
    raise ConnectionError("Two-factor authentication failed!")

    return resp_dict

    def kite_post_twofa(url, http_session):
    """
    Perform action after kite-two-factor authentication
    :param login_resp: The response payload from the primary user login
    :type login_resp: dict
    :param http_session: A http session
    :type http_session: :class:`requests.Session`
    :return: The response payload as a python dictionary
    :rtype: dict
    """
    url = url+"&skip_session=true"
    #data = dict()
    #data["user_id"] = config["USER"]
    #data["request_id"] = login_resp["data"]["request_id"]
    #data["twofa_value"] = str(config["PIN"])

    response = http_session.get(url=url, allow_redirects=False)
    if response.status_code == 302:
    reply = response.headers["Location"]
    request_token = re.findall(r'request_token=(.*)&action',reply)[0]

    # Deserialize the response content
    return request_token

    def generate_access_token(file,config,request_token):
    here = os.path.dirname(__file__)
    fp = os.path.join(here, file)
    kite = KiteConnect(api_key=config["KITE_API_KEY"])
    data = kite.generate_session(request_token, api_secret=config["KITE_API_SECRET"])
    user_data = json.dumps(data, indent=4, sort_keys=True,default=str)
    #print(data["user_name"],data["login_time"],data["access_token"])

    with open(fp, "w") as outfile:
    outfile.write(user_data)
    #time.sleep(5)
    LOGGER.info("Automatic login for "+data["user_name"]+" is done. "+file+" has been written to disk")



    if __name__ == "__main__":
    # Initialize logging framework
    init_logging()
    # Load the kite configuration information
    kite_config = load_kite_config()
    file = 'access_credentials.json'
    generate_token,login_time = need_to_generate_token(file)

    if generate_token :
    sess = requests.Session()

    # Attempt pre-login
    ref_url = kite_prelogin(config=kite_config, http_session=sess)

    # Attempt a login and get the response as a dictionary
    user_pass_login_resp = login_kite(config=kite_config, http_session=sess)
    LOGGER.info("Login successful!")

    # Attempt two-factor auth
    two_fa_resp = kite_twofa(login_resp=user_pass_login_resp, config=kite_config, http_session=sess)

    LOGGER.info("Two-factor authentication passed!", extra=two_fa_resp)

    request_token = kite_post_twofa(url=ref_url,http_session=sess)
    LOGGER.info("Generated request token = %s",str(request_token))

    generate_access_token(file,kite_config,request_token)
    else:
    LOGGER.info("Access token is valid till next day 7 am from "+str(login_time))



    11 changes: 11 additions & 0 deletions connection_info.json
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,11 @@
    {
    "KITE_API_KEY": "",
    "USER": "",
    "PASSWORD": "",
    "PIN": "",
    "KITE_API_SECRET":"",
    "LOGIN_REFERER": "https://kite.trade/connect/login?v=3&api_key=",
    "LOGIN_URL": "https://kite.zerodha.com/api/login",
    "TWOFA_URL": "https://kite.zerodha.com/api/twofa"

    }
    27 changes: 27 additions & 0 deletions logging.json
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,27 @@
    {
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
    "package_formatter": {
    "format": "[%(asctime)s] - [%(levelname)s] - [%(name)s] : %(message)s"
    },
    "json": {
    "()": "logmatic.JsonFormatter"
    }
    },
    "handlers": {
    "console": {
    "class": "logging.StreamHandler",
    "level": "DEBUG",
    "formatter": "json"
    }
    },
    "loggers": {
    },
    "root": {
    "level": "INFO",
    "handlers": [
    "console"
    ]
    }
    }
    7 changes: 7 additions & 0 deletions requirements.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    certifi==2020.4.5.1
    chardet==3.0.4
    idna==2.9
    logmatic-python==0.1.7
    python-json-logger==0.1.11
    requests==2.23.0
    urllib3==1.25.8