Skip to content

Instantly share code, notes, and snippets.

@parisni
Last active April 3, 2023 08:05
Show Gist options
  • Select an option

  • Save parisni/7855c59d43e586260167ccf9ba29fe10 to your computer and use it in GitHub Desktop.

Select an option

Save parisni/7855c59d43e586260167ccf9ba29fe10 to your computer and use it in GitHub Desktop.

Revisions

  1. parisni renamed this gist Apr 3, 2023. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. parisni created this gist Apr 3, 2023.
    45 changes: 45 additions & 0 deletions java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    public class CustomHoodieSparkRecordMergert implements HoodieRecordMerger {

    @Override
    public Option<Pair<HoodieRecord, Schema>> merge(
    HoodieRecord older,
    Schema oldSchema,
    HoodieRecord newer,
    Schema newSchema,
    TypedProperties props)
    throws IOException {
    ValidationUtils.checkArgument(older.getRecordType() == HoodieRecord.HoodieRecordType.SPARK);
    ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecord.HoodieRecordType.SPARK);
    Object[] returnValues = extractInternalRow(newer, newSchema);
    // Now, I want to create an Map<String, Array<Timestamp>>
    ArrayBasedMapData mapArray = new ArrayBasedMapData(
    ArrayData.toArrayData(new UTF8String[] {UTF8String.fromString("bar")}),
    ArrayData.toArrayData(ArrayData.toArrayData(new long[] {103079215152L})));
    returnVales[2] = mapArray;
    return forgeResult(returnValues, newSchema);
    }

    /**
    UTILS METHODS
    */

    private Object[] extractInternalRow(HoodieRecord record, Schema schema) {
    String[] fields = schema.getFields().stream().map(f -> f.name()).toArray(String[]::new);
    return record.getColumnValues(schema, fields, true);
    }

    private static Option<Pair<HoodieRecord, Schema>> forgeResult(
    Object[] returnValues, Schema newSchema) {
    StructType targetSparkSchema =
    HoodieInternalRowUtils.getCachedSchema(
    newSchema); // use cached schema to avoid useless computations
    Row row = RowFactory.create(returnValues);
    HoodieRecord mergedRecord = new HoodieSparkRecord(toUnsafeRow(row, targetSparkSchema));
    return Option.of(Pair.of(mergedRecord, newSchema));
    }

    public static UnsafeRow toUnsafeRow(Row row, StructType schema) {
    UnsafeProjection proj = UnsafeProjection.create(schema);
    return proj.apply((InternalRow) CatalystTypeConverters.convertToCatalyst(row));
    }
    }