Created
April 21, 2014 04:19
-
-
Save mbossenbroek/11132120 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns app.loader | |
| (:require [pigpen.raw :as raw] | |
| [pigpen.core :as pig] | |
| ;[pigpen.parquet.local :as parquet] | |
| ;[pigpen.util :refer [pigsym-zero pigsym-inc]] | |
| ) | |
| (:import (parquet.hadoop.ParquetReader) | |
| (parquet.tools.read SimpleReadSupport SimpleRecord SimpleRecord$NameValue) | |
| (parquet.hadoop ParquetReader) | |
| (org.apache.hadoop.fs Path))) | |
| ;project.clj: should use [com.twitter/parquet-tools "1.4.2-SNAPSHOT"] | |
| ;at least this revision: 41df190 - (HEAD, origin/master, origin/HEAD, master) Merge pull request #349 from Parquet/null_header | |
| ;you can build your own | |
| (set! *warn-on-reflection* true) | |
| (defn parquet-value->edn | |
| "Returns an array with the key and value from a NameValue record" | |
| [^SimpleRecord$NameValue nv] | |
| [(-> nv .getName symbol) | |
| (-> nv .getValue)]) | |
| (defn parquet-row->edn | |
| "Maps a List of NameValue records to an hashmap with the specified keys" | |
| ([^SimpleRecord row] | |
| (->> row | |
| (.getValues) | |
| (map parquet-value->edn) | |
| (into {})))) | |
| (defn parquet-reader ^ParquetReader [^String location] | |
| (ParquetReader. | |
| (Path. location) | |
| (SimpleReadSupport.))) | |
| (defmulti type->pig-type type) | |
| (defmethod type->pig-type :default [_] "bytearray") | |
| (defmethod type->pig-type String [_] "chararray") | |
| (defmethod type->pig-type Float [_] "float") | |
| (defmethod type->pig-type Double [_] "double") | |
| (defmethod type->pig-type Integer [_] "int") | |
| (defmethod type->pig-type Boolean [_] "boolean") | |
| (defn load-parquet-schema | |
| "Loads a parquet file, reads the first value, and returns the schema. The | |
| schema is a map of field name (a symbol) to the type (the pig type - a string)." | |
| [location] | |
| (->> location | |
| parquet-reader | |
| (.read) | |
| parquet-row->edn | |
| (map (fn [[k v]] [k (type->pig-type v)])) | |
| (into {}))) | |
| ; enhance PigPen to use parquet locally | |
| (defmethod pigpen.local/load "parquet.pig.ParquetLoader" [{:keys [location]}] | |
| (pigpen.local/load* (str "Parquet:" location) | |
| (fn [on-next cancel?] | |
| (with-open [reader (parquet-reader location)] | |
| (doseq [parquet-value (take-while (complement (some-fn nil? cancel?)) (repeatedly #(.read reader)))] | |
| (let [pigpen-value (parquet-row->edn parquet-value)] | |
| (on-next pigpen-value))))))) | |
| (defn augment-schema | |
| "If schema is not a map, get the extra schema information" | |
| [location schema] | |
| (if (map? schema) | |
| schema | |
| (select-keys (load-parquet-schema location) | |
| (map (comp symbol name) schema)))) | |
| (defn load-pq | |
| "Schema can be either a map, where it is used literally, or a sequence, where | |
| it is used to select fields. Either strings, keywords, or symbols can be used | |
| for fields." | |
| ([location] (load-pq location (load-parquet-schema location))) | |
| ([location schema] | |
| (let [schema (augment-schema location schema) | |
| fields (->> schema keys (mapv (comp symbol name))) | |
| pig-schema (->> schema | |
| (map (fn [[field type]] (str (name field) ":" type))) | |
| (clojure.string/join ",")) | |
| storage (raw/storage$ | |
| ;; built the jar from parquet-mr project, it's a submodule | |
| ["lib/parquet-pig-bundle-1.4.0.jar"] ;; the jar to reference | |
| "parquet.pig.ParquetLoader" ;; your loader class | |
| [pig-schema])] | |
| (-> location | |
| (raw/load$ fields storage {:implicit-schema true}) | |
| (raw/bind$ [] '(pigpen.pig/map->bind (pigpen.pig/args->map pigpen.pig/native->clojure)) | |
| {:args (clojure.core/mapcat (juxt str identity) fields), :field-type-in :native}))))) | |
| (comment | |
| (->> | |
| (load-pq "parquet/part-m-00000.parquet") | |
| (pig/dump))) | |
| (comment | |
| (->> | |
| (load-pq "parquet/part-m-00000.parquet" | |
| {:maker "chararray" | |
| :year "int"}) | |
| (pig/dump))) | |
| (comment | |
| (->> | |
| (load-pq "parquet/part-m-00000.parquet" [:maker :year]) | |
| (pig/dump))) | |
| (comment | |
| (->> | |
| (load-pq "parquet/part-m-00000.parquet") | |
| (pig/store-clj (str (gensym "out"))) | |
| (pig/write-script "temp.pig"))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment