Last active
May 28, 2021 08:00
-
-
Save kristiyanto/52974d3d7e88dac1303fedc63132dbcb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from fintech.features import activity, demographic, location, recency | |
| from pyspark.ml import Pipeline | |
| def generateFeatures(transaction, users): | |
| feature_list = [ | |
| # HISTORICAL ACTIVITIES | |
| activity.f_user_transcAmount_min(transc_table=transaction), | |
| activity.f_user_transcAmount_max(transc_table=transaction), | |
| activity.f_user_transcAmount_total(transc_table=transaction), | |
| activity.f_user_transcAmount_avg(transc_table=transaction), | |
| # TIME AND RECENCY | |
| recency.f_user_daysSinceLastTransc(transc_table=transaction), | |
| recency.f_user_transcCurrMonth_count(transc_table=transaction), | |
| recency.f_user_transcPrevMonth_count(transc_table=transaction), | |
| recency.f_transcByDayOfWeek_ratio(transc_table=transaction), | |
| recency.f_transcByTimeOfDay_ratio(transc_table=transaction), | |
| recency.f_user_merchantTranscLastMonth_count(transc_table=transaction), | |
| recency.f_user_nonMerchantTranscCurrMonth_count( | |
| transc_table=transaction), | |
| recency.f_user_transcAmountCurrMonth_total(transc_table=transaction), | |
| recency.f_user_transcAmountPrevMonth_total(transc_table=transaction), | |
| recency.f_user_transcByDirectionCurrMonth_count( | |
| transc_table=transaction), | |
| recency.f_user_transcByDirectionPrevMonth_count( | |
| transc_table=transaction), | |
| recency.f_user_transcByStatusCurrMonth_count(transc_table=transaction), | |
| recency.f_user_transcByStatusPrevMonth_count(transc_table=transaction), | |
| recency.f_user_transcByTypeCurrMonth_count(transc_table=transaction), | |
| recency.f_user_transcByTypePrevMonth_count(transc_table=transaction), | |
| # LOCATION | |
| location.f_user_localTranscAmount_avg(transc_table=transaction), | |
| location.f_user_intlTranscAmount_avg(transc_table=transaction), | |
| location.f_user_freqIntlTransc_ratio(transc_table=transaction), | |
| location.f_user_freqLocalTransc_ratio(transc_table=transaction), | |
| location.f_user_topForeign_country(transc_table=transaction), | |
| location.f_user_topMerchant_city(transc_table=transaction), | |
| # PREFERENCES AND DEMOGRAPHIC | |
| demographic.f_user_ageBucket(transc_table=transaction), | |
| demographic.f_user_plan(), | |
| demographic.f_user_daysSinceJoined(transc_table=transaction), | |
| demographic.f_user_daysSinceLastTransc_quantiles( | |
| transc_table=transaction), | |
| demographic.f_user_contact_count(), | |
| demographic.f_user_device(), | |
| demographic.f_user_home_city(), | |
| demographic.f_user_home_country() | |
| ] | |
| _feat_cols = [[i.feature_names] if isinstance( | |
| i.feature_names, str) else i.feature_names for i in feature_list] | |
| feature_columns = ["user_id"] + list(itertools.chain(*_feat_cols)) | |
| feature_pipeline = Pipeline(stages=feature_list) | |
| features = (feature_pipeline.fit(users) | |
| .transform(users) | |
| .select(feature_columns)) | |
| return features | |
| # batch is scheduled to run at the 1st day of next month | |
| monthly_batches = (transaction.select(f.trunc(f.add_months("created_date", 1), "month").alias("month")) | |
| .distinct() | |
| # .where(f.col('month')>f.lit('2019-02-01').cast('date')) | |
| .orderBy("month").toPandas().month.to_list()) | |
| if RECOMPUTE: | |
| for current_batch in monthly_batches: | |
| batch_id = current_batch.strftime("%Y-%m") | |
| ## Filter | |
| user_cols = ['user_id', 'birth_year','home_country', 'home_city', 'device', | |
| 'num_contacts', 'plan', 'num_successful_referrals', 'joined_date',] | |
| batch_transaction = transaction.filter( | |
| col("created_date").cast("date") < lit(current_batch).cast("date")) | |
| batch_users = (transaction.filter(col("joined_date").cast( | |
| "date") < lit(current_batch).cast("date")).select(*user_cols).distinct()) | |
| ## Temp files to address Out of Memory Issues for Laptop | |
| batch_transaction.write.mode('overwrite').parquet('data/tmp/batch_transaction.parquet') | |
| batch_transaction = spark.read.parquet('data/tmp/batch_transaction.parquet') | |
| batch_users.write.mode('overwrite').parquet('data/tmp/batch_user.parquet') | |
| batch_users = spark.read.parquet('data/tmp/batch_user.parquet') | |
| ## Feature extraction | |
| features_data = generateFeatures( | |
| transaction=batch_transaction, users=batch_users) | |
| features_data.select(lit(batch_id).alias("batch_id"), | |
| '*').write.mode("append").parquet("data/features/features.parquet") | |
| print(batch_id, " completed") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment