@@ -0,0 +1,59 @@
import org .apache .flink .api .common .restartstrategy .RestartStrategies ;
import org .apache .flink .api .java .utils .ParameterTool ;
import org .apache .flink .streaming .api .CheckpointingMode ;
import org .apache .flink .streaming .api .datastream .DataStream ;
import org .apache .flink .streaming .api .environment .CheckpointConfig ;
import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
import org .apache .flink .streaming .connectors .fs .bucketing .BucketingSink ;
import org .apache .flink .streaming .connectors .fs .bucketing .DateTimeBucketer ;
import org .apache .flink .streaming .connectors .kafka .FlinkKafkaConsumer08 ;
import org .apache .flink .streaming .util .serialization .SimpleStringSchema ;
import java .util .Properties ;
public class DataTransfer {
public static void main (String [] args ) throws Exception {
// Properties for Kafka
Properties kafkaProps = new Properties ();
kafkaProps .setProperty ("topic" , "flink-test" );
kafkaProps .setProperty ("bootstrap.servers" , "localhost:9092" );
kafkaProps .setProperty ("zookeeper.connect" , "localhost:2181" );
kafkaProps .setProperty ("group.id" , "flink-test" );
// Flink environment setup
StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment ();
env .getConfig ().disableSysoutLogging ();
env .getConfig ().setRestartStrategy (RestartStrategies .fixedDelayRestart (4 , 10000 ));
// Flink check/save point setting
env .enableCheckpointing (30000 );
env .getCheckpointConfig ().setCheckpointingMode (CheckpointingMode .EXACTLY_ONCE );
env .getCheckpointConfig ().setMinPauseBetweenCheckpoints (10000 );
env .getCheckpointConfig ().setCheckpointTimeout (10000 );
env .getCheckpointConfig ().setMaxConcurrentCheckpoints (1 );
env .getCheckpointConfig ().enableExternalizedCheckpoints (
CheckpointConfig .ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION
);
// Init the stream
DataStream <String > stream = env
.addSource (new FlinkKafkaConsumer08 <String >(
"flink-test" ,
new SimpleStringSchema (),
kafkaProps ));
// Path of the output
String basePath = "<some-place-in-your-machine>" ; // Here is you output path
BucketingSink <String > hdfsSink = new BucketingSink <>(basePath );
hdfsSink .setBucketer (new DateTimeBucketer <>("yyyy-MM-dd--HH-mm" ));
stream .print ();
stream .addSink (hdfsSink );
env .execute ();
}
}