Created
April 21, 2014 04:19
-
-
Save mbossenbroek/11132120 to your computer and use it in GitHub Desktop.
Revisions
-
mbossenbroek created this gist
Apr 21, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,115 @@ (ns app.loader (:require [pigpen.raw :as raw] [pigpen.core :as pig] ;[pigpen.parquet.local :as parquet] ;[pigpen.util :refer [pigsym-zero pigsym-inc]] ) (:import (parquet.hadoop.ParquetReader) (parquet.tools.read SimpleReadSupport SimpleRecord SimpleRecord$NameValue) (parquet.hadoop ParquetReader) (org.apache.hadoop.fs Path))) ;project.clj: should use [com.twitter/parquet-tools "1.4.2-SNAPSHOT"] ;at least this revision: 41df190 - (HEAD, origin/master, origin/HEAD, master) Merge pull request #349 from Parquet/null_header ;you can build your own (set! *warn-on-reflection* true) (defn parquet-value->edn "Returns an array with the key and value from a NameValue record" [^SimpleRecord$NameValue nv] [(-> nv .getName symbol) (-> nv .getValue)]) (defn parquet-row->edn "Maps a List of NameValue records to an hashmap with the specified keys" ([^SimpleRecord row] (->> row (.getValues) (map parquet-value->edn) (into {})))) (defn parquet-reader ^ParquetReader [^String location] (ParquetReader. (Path. location) (SimpleReadSupport.))) (defmulti type->pig-type type) (defmethod type->pig-type :default [_] "bytearray") (defmethod type->pig-type String [_] "chararray") (defmethod type->pig-type Float [_] "float") (defmethod type->pig-type Double [_] "double") (defmethod type->pig-type Integer [_] "int") (defmethod type->pig-type Boolean [_] "boolean") (defn load-parquet-schema "Loads a parquet file, reads the first value, and returns the schema. The schema is a map of field name (a symbol) to the type (the pig type - a string)." [location] (->> location parquet-reader (.read) parquet-row->edn (map (fn [[k v]] [k (type->pig-type v)])) (into {}))) ; enhance PigPen to use parquet locally (defmethod pigpen.local/load "parquet.pig.ParquetLoader" [{:keys [location]}] (pigpen.local/load* (str "Parquet:" location) (fn [on-next cancel?] (with-open [reader (parquet-reader location)] (doseq [parquet-value (take-while (complement (some-fn nil? cancel?)) (repeatedly #(.read reader)))] (let [pigpen-value (parquet-row->edn parquet-value)] (on-next pigpen-value))))))) (defn augment-schema "If schema is not a map, get the extra schema information" [location schema] (if (map? schema) schema (select-keys (load-parquet-schema location) (map (comp symbol name) schema)))) (defn load-pq "Schema can be either a map, where it is used literally, or a sequence, where it is used to select fields. Either strings, keywords, or symbols can be used for fields." ([location] (load-pq location (load-parquet-schema location))) ([location schema] (let [schema (augment-schema location schema) fields (->> schema keys (mapv (comp symbol name))) pig-schema (->> schema (map (fn [[field type]] (str (name field) ":" type))) (clojure.string/join ",")) storage (raw/storage$ ;; built the jar from parquet-mr project, it's a submodule ["lib/parquet-pig-bundle-1.4.0.jar"] ;; the jar to reference "parquet.pig.ParquetLoader" ;; your loader class [pig-schema])] (-> location (raw/load$ fields storage {:implicit-schema true}) (raw/bind$ [] '(pigpen.pig/map->bind (pigpen.pig/args->map pigpen.pig/native->clojure)) {:args (clojure.core/mapcat (juxt str identity) fields), :field-type-in :native}))))) (comment (->> (load-pq "parquet/part-m-00000.parquet") (pig/dump))) (comment (->> (load-pq "parquet/part-m-00000.parquet" {:maker "chararray" :year "int"}) (pig/dump))) (comment (->> (load-pq "parquet/part-m-00000.parquet" [:maker :year]) (pig/dump))) (comment (->> (load-pq "parquet/part-m-00000.parquet") (pig/store-clj (str (gensym "out"))) (pig/write-script "temp.pig")))