Skip to content

Instantly share code, notes, and snippets.

@sscdotopen
Created November 12, 2013 15:10
Show Gist options
  • Select an option

  • Save sscdotopen/7432435 to your computer and use it in GitHub Desktop.

Select an option

Save sscdotopen/7432435 to your computer and use it in GitHub Desktop.

Revisions

  1. sscdotopen created this gist Nov 12, 2013.
    32 changes: 32 additions & 0 deletions gistfile1.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,32 @@
    package eu.stratosphere.scala.examples.wordcount

    import eu.stratosphere.scala.{ScalaPlan, TextFile}
    import eu.stratosphere.pact.common.plan.PlanAssembler


    import eu.stratosphere.scala._
    import eu.stratosphere.scala.operators._

    case class Author(id: Int, name: String)
    case class Book(authorId: Int, year: Int, title: String)

    class BookAuthorJoin extends PlanAssembler with Serializable {

    override def getPlan(args: String*) = {
    getScalaPlan(args(0), args(1), args(2), args(3).toInt)
    }

    def getScalaPlan(booksFile: String, authorsFile: String, output: String, numSubTasks: Int) = {

    val authors = DataSource(authorsFile, CsvInputFormat[Author]())
    val books = DataSource(booksFile, CsvInputFormat[Book]())

    val booksByAuthor = authors join books where { _.id } isEqualTo { _.authorId }
    map { case (author, book) => (author.name, book.title, book.year) }

    val output = booksByAuthor.write(output, CsvOutputFormat[(String, String, Int)]())

    ScalaPlan(Seq(output), "Book-Author-Join", numSubTasks)
    }

    }