Last active
February 2, 2019 19:15
-
-
Save amita-shukla/38588e7321e0375fbfd7612586db199a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.IOException; | |
| import java.util.ArrayList; | |
| import java.util.List; | |
| import org.apache.hadoop.conf.Configuration; | |
| import org.apache.hadoop.hive.ql.io.orc.OrcSerde; | |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; | |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; | |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; | |
| import org.apache.hadoop.io.LongWritable; | |
| import org.apache.hadoop.io.NullWritable; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.io.Writable; | |
| import org.apache.hadoop.mapreduce.Mapper; | |
| public class FileMapper extends Mapper<LongWritable, Text, NullWritable, Writable>{ | |
| private OrcSerde serde; | |
| private String types; | |
| private TypeInfo typeInfo; | |
| private ObjectInspector objectInspector; | |
| private List<Object> struct; | |
| private ArrayList<ColumnDatatypeMapping> mapping; | |
| @Override | |
| protected void setup(Mapper<LongWritable, Text, NullWritable, Writable>.Context context){ | |
| serde = new OrcSerde(); | |
| types = "struct<order_id:int,order_date:date,order_customer_id:int,order_status:string,order_product_id:int,order_quantity:int,order_subtotal:double,order_product_price:double,sub_category:string,category:string>"; | |
| typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(types); | |
| objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); | |
| mapping = new ArrayList<ColumnDatatypeMapping>(); | |
| //create mapping, in the same order as in the text file | |
| mapping.add(new ColumnDatatypeMapping("order_id", "int")); | |
| mapping.add(new ColumnDatatypeMapping("order_date", "date")); | |
| mapping.add(new ColumnDatatypeMapping("order_customer_id", "int")); | |
| mapping.add(new ColumnDatatypeMapping("order_status", "string")); | |
| mapping.add(new ColumnDatatypeMapping("order_product_id", "int")); | |
| mapping.add(new ColumnDatatypeMapping("order_quantity", "int")); | |
| mapping.add(new ColumnDatatypeMapping("order_subtotal", "double")); | |
| mapping.add(new ColumnDatatypeMapping("order_product_price", "double")); | |
| mapping.add(new ColumnDatatypeMapping("sub_category", "string")); | |
| mapping.add(new ColumnDatatypeMapping("category", "string")); | |
| } | |
| @Override | |
| protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ | |
| String delimiter = ","; | |
| String[] data = value.toString().split(delimiter); | |
| struct = TextParser.buildList(data, mapping); | |
| Writable row = serde.serialize(struct, objectInspector); | |
| context.write(NullWritable.get(),row); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment