Skip to content

Instantly share code, notes, and snippets.

@amita-shukla
Last active February 2, 2019 19:15
Show Gist options
  • Select an option

  • Save amita-shukla/38588e7321e0375fbfd7612586db199a to your computer and use it in GitHub Desktop.

Select an option

Save amita-shukla/38588e7321e0375fbfd7612586db199a to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
public class FileMapper extends Mapper<LongWritable, Text, NullWritable, Writable>{
private OrcSerde serde;
private String types;
private TypeInfo typeInfo;
private ObjectInspector objectInspector;
private List<Object> struct;
private ArrayList<ColumnDatatypeMapping> mapping;
@Override
protected void setup(Mapper<LongWritable, Text, NullWritable, Writable>.Context context){
serde = new OrcSerde();
types = "struct<order_id:int,order_date:date,order_customer_id:int,order_status:string,order_product_id:int,order_quantity:int,order_subtotal:double,order_product_price:double,sub_category:string,category:string>";
typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(types);
objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
mapping = new ArrayList<ColumnDatatypeMapping>();
//create mapping, in the same order as in the text file
mapping.add(new ColumnDatatypeMapping("order_id", "int"));
mapping.add(new ColumnDatatypeMapping("order_date", "date"));
mapping.add(new ColumnDatatypeMapping("order_customer_id", "int"));
mapping.add(new ColumnDatatypeMapping("order_status", "string"));
mapping.add(new ColumnDatatypeMapping("order_product_id", "int"));
mapping.add(new ColumnDatatypeMapping("order_quantity", "int"));
mapping.add(new ColumnDatatypeMapping("order_subtotal", "double"));
mapping.add(new ColumnDatatypeMapping("order_product_price", "double"));
mapping.add(new ColumnDatatypeMapping("sub_category", "string"));
mapping.add(new ColumnDatatypeMapping("category", "string"));
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String delimiter = ",";
String[] data = value.toString().split(delimiter);
struct = TextParser.buildList(data, mapping);
Writable row = serde.serialize(struct, objectInspector);
context.write(NullWritable.get(),row);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment