""" Main ETL script Formato table final Nome do Banco CNPJ Classificação do Banco Quantidade de Clientes do Bancos Índice de reclamações Quantidade de reclamações Índice de satisfação dos funcionários dos bancos Índice de satisfação com salários dos funcionários dos bancos. """ import io import polars as pl import boto3 s3 = boto3.client("s3") bucket_input_name = "edb011-input" bancos_path_filename = "Bancos/EnquadramentoInicia_v2.tsv" empregados_path_filename = "Empregados/glassdoor_consolidado_join_match_v2.csv" class Extract: def __init__(self, file_name=None): self.bucket = bucket_input_name self.file_name = file_name def read_file(self, separator=None): obj = s3.get_object(Bucket=self.bucket, Key=self.file_name) df = pl.read_csv(obj["Body"], separator=separator) return df def _transform(): """Minimal transformation to be able to extract data""" COLUMN_TYPES = {"Quantidade total de clientes \x96 CCS e SCR": pl.Int64(), "Quantidade de clientes \x96 CCS": pl.Int64(), "Quantidade de clientes \x96 SCR": pl.Int64()} COLUMN_RENAME = {"Quantidade total de clientes \x96 CCS e SCR": "Quantidade total de clientes CCS e SCR", "Quantidade de clientes \x96 CCS": "Quantidade de clientes CCS", "Quantidade de clientes \x96 SCR": "Quantidade de clientes SCR"} return COLUMN_TYPES, COLUMN_RENAME def read_multi_files(self): COLUMN_TYPES, COLUMN_RENAME = Extract._transform() bucket_input_obj = boto3.resource('s3').Bucket(bucket_input_name) prefix_objs = bucket_input_obj.objects.filter(Prefix="Reclamacoes/") df = pl.concat([(pl.read_csv(io.BytesIO(obj.get()['Body'].read()), encoding="latin1", separator=";", dtypes=COLUMN_TYPES) .rename(COLUMN_RENAME)) for obj in prefix_objs]) return df def transform(): """Join needed tables, transform and export it""" BANK_NAME = { "BANCO TOKYO": "BANCO TOKYO-MITSUBISHI BM S.A.", "MERCEDES": "MERCEDES-BENZ" } bancos = Extract(bancos_path_filename).read_file("\t") bancos = ( bancos.with_columns(pl.col("Nome").str.split_exact("-", 0)).unnest("Nome").rename({"field_0": "Nome"}) .with_columns(pl.col("Nome").str.strip(" ")) .with_columns(pl.col("Nome").str.replace("BANCO TOKYO", "BANCO TOKYO-MITSUBISHI BM S.A.")) .with_columns(pl.col("Nome").str.replace("MERCEDES", "MERCEDES-BENZ")) ) empregados = Extract(empregados_path_filename).read_file("|") bancos_empregados = bancos.join(empregados, on="Nome", how="left") bancos_empregados = bancos_empregados.with_columns(pl.col("CNPJ").cast(pl.Utf8)) bancos_empregados = bancos_empregados.groupby(["CNPJ", "Nome"]).agg([pl.mean("Remuneração e benefícios") , pl.mean("Perspectiva positiva da empresa(%)")]) reclamacoes = Extract().read_multi_files() reclamacoes = reclamacoes.groupby(["CNPJ IF", "Segmento"]).agg([pl.sum("Quantidade total de reclamações") , pl.sum("Quantidade total de clientes CCS e SCR")]) df = bancos_empregados.join(reclamacoes, left_on="CNPJ", right_on="CNPJ IF") df = df.rename({"Nome": "nome", "CNPJ": "cnpj", "Segmento": "classificacao_banco", "Quantidade total de clientes CCS e SCR": "quantidade_total_clientes", "Quantidade total de reclamações": "quantidade_total_reclamacoes", "Remuneração e benefícios": "idx_satisfacao_salario"}) df = df.with_columns((pl.col("quantidade_total_reclamacoes") / pl.col("quantidade_total_clientes")).alias("idx_reclamacoes")) df.select(pl.col(["nome", "cnpj", "classificacao_banco", "quantidade_total_clientes", "idx_reclamacoes", "quantidade_total_reclamacoes", "idx_satisfacao_salario"])) return df def export(): """Exports data as csv to S3 Bucket""" df = transform() df.write_csv("final_table.csv", separator=",") if __name__ == "__main__": export()