Skip to content

Instantly share code, notes, and snippets.

@mbossenbroek
Created April 21, 2014 04:19
Show Gist options
  • Select an option

  • Save mbossenbroek/11132120 to your computer and use it in GitHub Desktop.

Select an option

Save mbossenbroek/11132120 to your computer and use it in GitHub Desktop.
(ns app.loader
(:require [pigpen.raw :as raw]
[pigpen.core :as pig]
;[pigpen.parquet.local :as parquet]
;[pigpen.util :refer [pigsym-zero pigsym-inc]]
)
(:import (parquet.hadoop.ParquetReader)
(parquet.tools.read SimpleReadSupport SimpleRecord SimpleRecord$NameValue)
(parquet.hadoop ParquetReader)
(org.apache.hadoop.fs Path)))
;project.clj: should use [com.twitter/parquet-tools "1.4.2-SNAPSHOT"]
;at least this revision: 41df190 - (HEAD, origin/master, origin/HEAD, master) Merge pull request #349 from Parquet/null_header
;you can build your own
(set! *warn-on-reflection* true)
(defn parquet-value->edn
"Returns an array with the key and value from a NameValue record"
[^SimpleRecord$NameValue nv]
[(-> nv .getName symbol)
(-> nv .getValue)])
(defn parquet-row->edn
"Maps a List of NameValue records to an hashmap with the specified keys"
([^SimpleRecord row]
(->> row
(.getValues)
(map parquet-value->edn)
(into {}))))
(defn parquet-reader ^ParquetReader [^String location]
(ParquetReader.
(Path. location)
(SimpleReadSupport.)))
(defmulti type->pig-type type)
(defmethod type->pig-type :default [_] "bytearray")
(defmethod type->pig-type String [_] "chararray")
(defmethod type->pig-type Float [_] "float")
(defmethod type->pig-type Double [_] "double")
(defmethod type->pig-type Integer [_] "int")
(defmethod type->pig-type Boolean [_] "boolean")
(defn load-parquet-schema
"Loads a parquet file, reads the first value, and returns the schema. The
schema is a map of field name (a symbol) to the type (the pig type - a string)."
[location]
(->> location
parquet-reader
(.read)
parquet-row->edn
(map (fn [[k v]] [k (type->pig-type v)]))
(into {})))
; enhance PigPen to use parquet locally
(defmethod pigpen.local/load "parquet.pig.ParquetLoader" [{:keys [location]}]
(pigpen.local/load* (str "Parquet:" location)
(fn [on-next cancel?]
(with-open [reader (parquet-reader location)]
(doseq [parquet-value (take-while (complement (some-fn nil? cancel?)) (repeatedly #(.read reader)))]
(let [pigpen-value (parquet-row->edn parquet-value)]
(on-next pigpen-value)))))))
(defn augment-schema
"If schema is not a map, get the extra schema information"
[location schema]
(if (map? schema)
schema
(select-keys (load-parquet-schema location)
(map (comp symbol name) schema))))
(defn load-pq
"Schema can be either a map, where it is used literally, or a sequence, where
it is used to select fields. Either strings, keywords, or symbols can be used
for fields."
([location] (load-pq location (load-parquet-schema location)))
([location schema]
(let [schema (augment-schema location schema)
fields (->> schema keys (mapv (comp symbol name)))
pig-schema (->> schema
(map (fn [[field type]] (str (name field) ":" type)))
(clojure.string/join ","))
storage (raw/storage$
;; built the jar from parquet-mr project, it's a submodule
["lib/parquet-pig-bundle-1.4.0.jar"] ;; the jar to reference
"parquet.pig.ParquetLoader" ;; your loader class
[pig-schema])]
(-> location
(raw/load$ fields storage {:implicit-schema true})
(raw/bind$ [] '(pigpen.pig/map->bind (pigpen.pig/args->map pigpen.pig/native->clojure))
{:args (clojure.core/mapcat (juxt str identity) fields), :field-type-in :native})))))
(comment
(->>
(load-pq "parquet/part-m-00000.parquet")
(pig/dump)))
(comment
(->>
(load-pq "parquet/part-m-00000.parquet"
{:maker "chararray"
:year "int"})
(pig/dump)))
(comment
(->>
(load-pq "parquet/part-m-00000.parquet" [:maker :year])
(pig/dump)))
(comment
(->>
(load-pq "parquet/part-m-00000.parquet")
(pig/store-clj (str (gensym "out")))
(pig/write-script "temp.pig")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment