Skip to content

Instantly share code, notes, and snippets.

@stettix
Created June 3, 2019 11:35
Show Gist options
  • Select an option

  • Save stettix/8fd8ba1496f8653dcb369871b85d1351 to your computer and use it in GitHub Desktop.

Select an option

Save stettix/8fd8ba1496f8653dcb369871b85d1351 to your computer and use it in GitHub Desktop.

Revisions

  1. stettix created this gist Jun 3, 2019.
    111 changes: 111 additions & 0 deletions Backfiller.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,111 @@
    package backfiller

    import aws.AWS
    import cats.effect.{ExitCode, IO, IOApp}
    import com.amazonaws.AmazonServiceException
    import com.amazonaws.retry.RetryUtils
    import com.gu.emr.ClusterManager.ClusterID
    import com.gu.emr.EmrClusterManager
    import com.gu.emr.model.{ClusterDefinition, EmrStep, RunConfiguration}
    import fs2.{Chunk, Pure, Stream}
    import newworld.syntax.ClusterManagerSyntax
    import org.joda.time.LocalDate
    import scheduler.actions.ActionSet

    import scala.concurrent.ExecutionContext

    object Backfiller extends IOApp with ClusterManagerSyntax {

    val MaxStepsPerCluster = 250

    private val defaultRunConfiguration = RunConfiguration.default("s3://ophan-temp/emr/logs")

    def run(args: List[String]): IO[ExitCode] = {

    if (args.length < 2 || args.length > 3) {
    System.err.println("Usage: Backfiller <Job ID> <start date> <end date>")
    System.exit(1)
    }

    val jobId = args(0)
    val startDate = LocalDate.parse(args(1))
    val endDate = LocalDate.parse(args(2))

    val job = ActionSet.prod.findById(jobId).getOrElse(throw new Error(s"Couldn't find action with ID: $jobId"))

    println(s"Running job '${job.id}' (${job.description}) from date $startDate to $endDate")

    implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global

    val dates: Stream[Pure, LocalDate] = Stream
    .range(0, Int.MaxValue)
    .map(startDate.plusDays)
    .takeThrough(date => endDate.isAfter(date))

    val jobSteps: Stream[Pure, Chunk[StepForDate]] =
    dates
    .flatMap(date => Stream.emits(job.steps(date).map(step => StepForDate(date, step))))
    .chunkN(MaxStepsPerCluster)

    val emr = AWS.emr
    val clusterManager = new EmrClusterManager("backfiller", emr)(ec)

    val program: Stream[IO, ExitCode] = for {
    jobStepChunk <- jobSteps.covary[IO]
    clusterIdForChunk <- Stream.eval(
    getOrCreateEmrCluster(clusterManager,
    job.clusterDefinition,
    backfillClusterName(job.clusterDefinition.name, jobStepChunk.toList.map(_.date))))
    _ = println(s"Adding ${jobStepChunk.size} steps to cluster $clusterIdForChunk")
    _ <- Stream.eval(clusterManager.submitStepsIO(clusterIdForChunk, jobStepChunk.toList.map(_.step)))
    _ = println("All done")
    } yield ExitCode.Success

    program.compile.last.map(_.getOrElse(ExitCode.Error)).guarantee(IO(emr.shutdown()))
    }

    def getOrCreateEmrCluster(
    clusterManager: EmrClusterManager,
    clusterDefinition: ClusterDefinition,
    backfillClusterName: String): IO[ClusterID] = {

    val renamedCluster = clusterDefinition.copy(name = backfillClusterName)
    for {
    maybeExistingClusterId <- clusterManager.findClusterIdIO(renamedCluster)
    clusterId <- getOrElse(maybeExistingClusterId,
    clusterManager.launchClusterIO(renamedCluster, defaultRunConfiguration))
    } yield clusterId
    }

    def backfillClusterName(originalName: String, dates: List[LocalDate]): String =
    s"Backfill $originalName ${dates.head} to ${dates.last}"

    import scala.concurrent.duration._

    def asRetriableStream[T](op: IO[T], operationDescription: String): Stream[IO, T] =
    fs2.Stream
    .retry(op, 10.seconds, nextDelay = identity, maxAttempts = 6 * 10, isRetriable(operationDescription))

    case class StepForDate(date: LocalDate, step: EmrStep)

    case object Waiting extends Exception

    def isRetriable(operation: String): Throwable => Boolean = {
    case Waiting =>
    System.err.println(s"Retrying $operation")
    true
    case e: AmazonServiceException if RetryUtils.isRetryableServiceException(e) =>
    System.err.println(s"Retriable AWS error (${e.getErrorCode}) while performing $operation", e)
    true
    case e =>
    System.err.println(s"Error while performing $operation", e)
    false
    }

    // Implement here as the old version of cats-effect that's currently on classpath doesn't have it.
    def getOrElse[A](maybeA: Option[A], alt: => IO[A]): IO[A] = maybeA match {
    case Some(a) => IO.pure(a)
    case None => alt
    }

    }