Skip to content

Instantly share code, notes, and snippets.

@kristiyanto
Last active May 28, 2021 08:00
Show Gist options
  • Select an option

  • Save kristiyanto/52974d3d7e88dac1303fedc63132dbcb to your computer and use it in GitHub Desktop.

Select an option

Save kristiyanto/52974d3d7e88dac1303fedc63132dbcb to your computer and use it in GitHub Desktop.
from fintech.features import activity, demographic, location, recency
from pyspark.ml import Pipeline
def generateFeatures(transaction, users):
feature_list = [
# HISTORICAL ACTIVITIES
activity.f_user_transcAmount_min(transc_table=transaction),
activity.f_user_transcAmount_max(transc_table=transaction),
activity.f_user_transcAmount_total(transc_table=transaction),
activity.f_user_transcAmount_avg(transc_table=transaction),
# TIME AND RECENCY
recency.f_user_daysSinceLastTransc(transc_table=transaction),
recency.f_user_transcCurrMonth_count(transc_table=transaction),
recency.f_user_transcPrevMonth_count(transc_table=transaction),
recency.f_transcByDayOfWeek_ratio(transc_table=transaction),
recency.f_transcByTimeOfDay_ratio(transc_table=transaction),
recency.f_user_merchantTranscLastMonth_count(transc_table=transaction),
recency.f_user_nonMerchantTranscCurrMonth_count(
transc_table=transaction),
recency.f_user_transcAmountCurrMonth_total(transc_table=transaction),
recency.f_user_transcAmountPrevMonth_total(transc_table=transaction),
recency.f_user_transcByDirectionCurrMonth_count(
transc_table=transaction),
recency.f_user_transcByDirectionPrevMonth_count(
transc_table=transaction),
recency.f_user_transcByStatusCurrMonth_count(transc_table=transaction),
recency.f_user_transcByStatusPrevMonth_count(transc_table=transaction),
recency.f_user_transcByTypeCurrMonth_count(transc_table=transaction),
recency.f_user_transcByTypePrevMonth_count(transc_table=transaction),
# LOCATION
location.f_user_localTranscAmount_avg(transc_table=transaction),
location.f_user_intlTranscAmount_avg(transc_table=transaction),
location.f_user_freqIntlTransc_ratio(transc_table=transaction),
location.f_user_freqLocalTransc_ratio(transc_table=transaction),
location.f_user_topForeign_country(transc_table=transaction),
location.f_user_topMerchant_city(transc_table=transaction),
# PREFERENCES AND DEMOGRAPHIC
demographic.f_user_ageBucket(transc_table=transaction),
demographic.f_user_plan(),
demographic.f_user_daysSinceJoined(transc_table=transaction),
demographic.f_user_daysSinceLastTransc_quantiles(
transc_table=transaction),
demographic.f_user_contact_count(),
demographic.f_user_device(),
demographic.f_user_home_city(),
demographic.f_user_home_country()
]
_feat_cols = [[i.feature_names] if isinstance(
i.feature_names, str) else i.feature_names for i in feature_list]
feature_columns = ["user_id"] + list(itertools.chain(*_feat_cols))
feature_pipeline = Pipeline(stages=feature_list)
features = (feature_pipeline.fit(users)
.transform(users)
.select(feature_columns))
return features
# batch is scheduled to run at the 1st day of next month
monthly_batches = (transaction.select(f.trunc(f.add_months("created_date", 1), "month").alias("month"))
.distinct()
# .where(f.col('month')>f.lit('2019-02-01').cast('date'))
.orderBy("month").toPandas().month.to_list())
if RECOMPUTE:
for current_batch in monthly_batches:
batch_id = current_batch.strftime("%Y-%m")
## Filter
user_cols = ['user_id', 'birth_year','home_country', 'home_city', 'device',
'num_contacts', 'plan', 'num_successful_referrals', 'joined_date',]
batch_transaction = transaction.filter(
col("created_date").cast("date") < lit(current_batch).cast("date"))
batch_users = (transaction.filter(col("joined_date").cast(
"date") < lit(current_batch).cast("date")).select(*user_cols).distinct())
## Temp files to address Out of Memory Issues for Laptop
batch_transaction.write.mode('overwrite').parquet('data/tmp/batch_transaction.parquet')
batch_transaction = spark.read.parquet('data/tmp/batch_transaction.parquet')
batch_users.write.mode('overwrite').parquet('data/tmp/batch_user.parquet')
batch_users = spark.read.parquet('data/tmp/batch_user.parquet')
## Feature extraction
features_data = generateFeatures(
transaction=batch_transaction, users=batch_users)
features_data.select(lit(batch_id).alias("batch_id"),
'*').write.mode("append").parquet("data/features/features.parquet")
print(batch_id, " completed")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment