Skip to content

Instantly share code, notes, and snippets.

@mbossenbroek
Created April 21, 2014 04:19
Show Gist options
  • Select an option

  • Save mbossenbroek/11132120 to your computer and use it in GitHub Desktop.

Select an option

Save mbossenbroek/11132120 to your computer and use it in GitHub Desktop.

Revisions

  1. mbossenbroek created this gist Apr 21, 2014.
    115 changes: 115 additions & 0 deletions loader.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    (ns app.loader
    (:require [pigpen.raw :as raw]
    [pigpen.core :as pig]
    ;[pigpen.parquet.local :as parquet]
    ;[pigpen.util :refer [pigsym-zero pigsym-inc]]
    )
    (:import (parquet.hadoop.ParquetReader)
    (parquet.tools.read SimpleReadSupport SimpleRecord SimpleRecord$NameValue)
    (parquet.hadoop ParquetReader)
    (org.apache.hadoop.fs Path)))

    ;project.clj: should use [com.twitter/parquet-tools "1.4.2-SNAPSHOT"]
    ;at least this revision: 41df190 - (HEAD, origin/master, origin/HEAD, master) Merge pull request #349 from Parquet/null_header
    ;you can build your own

    (set! *warn-on-reflection* true)

    (defn parquet-value->edn
    "Returns an array with the key and value from a NameValue record"
    [^SimpleRecord$NameValue nv]
    [(-> nv .getName symbol)
    (-> nv .getValue)])

    (defn parquet-row->edn
    "Maps a List of NameValue records to an hashmap with the specified keys"
    ([^SimpleRecord row]
    (->> row
    (.getValues)
    (map parquet-value->edn)
    (into {}))))

    (defn parquet-reader ^ParquetReader [^String location]
    (ParquetReader.
    (Path. location)
    (SimpleReadSupport.)))

    (defmulti type->pig-type type)
    (defmethod type->pig-type :default [_] "bytearray")
    (defmethod type->pig-type String [_] "chararray")
    (defmethod type->pig-type Float [_] "float")
    (defmethod type->pig-type Double [_] "double")
    (defmethod type->pig-type Integer [_] "int")
    (defmethod type->pig-type Boolean [_] "boolean")

    (defn load-parquet-schema
    "Loads a parquet file, reads the first value, and returns the schema. The
    schema is a map of field name (a symbol) to the type (the pig type - a string)."
    [location]
    (->> location
    parquet-reader
    (.read)
    parquet-row->edn
    (map (fn [[k v]] [k (type->pig-type v)]))
    (into {})))

    ; enhance PigPen to use parquet locally
    (defmethod pigpen.local/load "parquet.pig.ParquetLoader" [{:keys [location]}]
    (pigpen.local/load* (str "Parquet:" location)
    (fn [on-next cancel?]
    (with-open [reader (parquet-reader location)]
    (doseq [parquet-value (take-while (complement (some-fn nil? cancel?)) (repeatedly #(.read reader)))]
    (let [pigpen-value (parquet-row->edn parquet-value)]
    (on-next pigpen-value)))))))

    (defn augment-schema
    "If schema is not a map, get the extra schema information"
    [location schema]
    (if (map? schema)
    schema
    (select-keys (load-parquet-schema location)
    (map (comp symbol name) schema))))

    (defn load-pq
    "Schema can be either a map, where it is used literally, or a sequence, where
    it is used to select fields. Either strings, keywords, or symbols can be used
    for fields."
    ([location] (load-pq location (load-parquet-schema location)))
    ([location schema]
    (let [schema (augment-schema location schema)
    fields (->> schema keys (mapv (comp symbol name)))
    pig-schema (->> schema
    (map (fn [[field type]] (str (name field) ":" type)))
    (clojure.string/join ","))
    storage (raw/storage$
    ;; built the jar from parquet-mr project, it's a submodule
    ["lib/parquet-pig-bundle-1.4.0.jar"] ;; the jar to reference
    "parquet.pig.ParquetLoader" ;; your loader class
    [pig-schema])]
    (-> location
    (raw/load$ fields storage {:implicit-schema true})
    (raw/bind$ [] '(pigpen.pig/map->bind (pigpen.pig/args->map pigpen.pig/native->clojure))
    {:args (clojure.core/mapcat (juxt str identity) fields), :field-type-in :native})))))

    (comment
    (->>
    (load-pq "parquet/part-m-00000.parquet")
    (pig/dump)))

    (comment
    (->>
    (load-pq "parquet/part-m-00000.parquet"
    {:maker "chararray"
    :year "int"})
    (pig/dump)))

    (comment
    (->>
    (load-pq "parquet/part-m-00000.parquet" [:maker :year])
    (pig/dump)))

    (comment
    (->>
    (load-pq "parquet/part-m-00000.parquet")
    (pig/store-clj (str (gensym "out")))
    (pig/write-script "temp.pig")))