Skip to content

Instantly share code, notes, and snippets.

@dmazzer
Forked from visualskyrim/DataTransfer.java
Created October 6, 2017 01:02
Show Gist options
  • Select an option

  • Save dmazzer/502df7cc2fd1b07605adf0f139f9696a to your computer and use it in GitHub Desktop.

Select an option

Save dmazzer/502df7cc2fd1b07605adf0f139f9696a to your computer and use it in GitHub Desktop.

Revisions

  1. Kong Mu created this gist Jun 13, 2017.
    59 changes: 59 additions & 0 deletions DataTransfer.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,59 @@
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
    import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

    import java.util.Properties;


    public class DataTransfer {


    public static void main(String[] args) throws Exception {
    // Properties for Kafka
    Properties kafkaProps = new Properties();
    kafkaProps.setProperty("topic", "flink-test");
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
    kafkaProps.setProperty("group.id", "flink-test");

    // Flink environment setup
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

    // Flink check/save point setting
    env.enableCheckpointing(30000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
    env.getCheckpointConfig().setCheckpointTimeout(10000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    );

    // Init the stream
    DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer08<String>(
    "flink-test",
    new SimpleStringSchema(),
    kafkaProps));

    // Path of the output
    String basePath = "<some-place-in-your-machine>"; // Here is you output path

    BucketingSink<String> hdfsSink = new BucketingSink<>(basePath);
    hdfsSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH-mm"));
    stream.print();
    stream.addSink(hdfsSink);

    env.execute();
    }
    }