Created
June 3, 2019 11:35
-
-
Save stettix/8fd8ba1496f8653dcb369871b85d1351 to your computer and use it in GitHub Desktop.
Revisions
-
stettix created this gist
Jun 3, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,111 @@ package backfiller import aws.AWS import cats.effect.{ExitCode, IO, IOApp} import com.amazonaws.AmazonServiceException import com.amazonaws.retry.RetryUtils import com.gu.emr.ClusterManager.ClusterID import com.gu.emr.EmrClusterManager import com.gu.emr.model.{ClusterDefinition, EmrStep, RunConfiguration} import fs2.{Chunk, Pure, Stream} import newworld.syntax.ClusterManagerSyntax import org.joda.time.LocalDate import scheduler.actions.ActionSet import scala.concurrent.ExecutionContext object Backfiller extends IOApp with ClusterManagerSyntax { val MaxStepsPerCluster = 250 private val defaultRunConfiguration = RunConfiguration.default("s3://ophan-temp/emr/logs") def run(args: List[String]): IO[ExitCode] = { if (args.length < 2 || args.length > 3) { System.err.println("Usage: Backfiller <Job ID> <start date> <end date>") System.exit(1) } val jobId = args(0) val startDate = LocalDate.parse(args(1)) val endDate = LocalDate.parse(args(2)) val job = ActionSet.prod.findById(jobId).getOrElse(throw new Error(s"Couldn't find action with ID: $jobId")) println(s"Running job '${job.id}' (${job.description}) from date $startDate to $endDate") implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global val dates: Stream[Pure, LocalDate] = Stream .range(0, Int.MaxValue) .map(startDate.plusDays) .takeThrough(date => endDate.isAfter(date)) val jobSteps: Stream[Pure, Chunk[StepForDate]] = dates .flatMap(date => Stream.emits(job.steps(date).map(step => StepForDate(date, step)))) .chunkN(MaxStepsPerCluster) val emr = AWS.emr val clusterManager = new EmrClusterManager("backfiller", emr)(ec) val program: Stream[IO, ExitCode] = for { jobStepChunk <- jobSteps.covary[IO] clusterIdForChunk <- Stream.eval( getOrCreateEmrCluster(clusterManager, job.clusterDefinition, backfillClusterName(job.clusterDefinition.name, jobStepChunk.toList.map(_.date)))) _ = println(s"Adding ${jobStepChunk.size} steps to cluster $clusterIdForChunk") _ <- Stream.eval(clusterManager.submitStepsIO(clusterIdForChunk, jobStepChunk.toList.map(_.step))) _ = println("All done") } yield ExitCode.Success program.compile.last.map(_.getOrElse(ExitCode.Error)).guarantee(IO(emr.shutdown())) } def getOrCreateEmrCluster( clusterManager: EmrClusterManager, clusterDefinition: ClusterDefinition, backfillClusterName: String): IO[ClusterID] = { val renamedCluster = clusterDefinition.copy(name = backfillClusterName) for { maybeExistingClusterId <- clusterManager.findClusterIdIO(renamedCluster) clusterId <- getOrElse(maybeExistingClusterId, clusterManager.launchClusterIO(renamedCluster, defaultRunConfiguration)) } yield clusterId } def backfillClusterName(originalName: String, dates: List[LocalDate]): String = s"Backfill $originalName ${dates.head} to ${dates.last}" import scala.concurrent.duration._ def asRetriableStream[T](op: IO[T], operationDescription: String): Stream[IO, T] = fs2.Stream .retry(op, 10.seconds, nextDelay = identity, maxAttempts = 6 * 10, isRetriable(operationDescription)) case class StepForDate(date: LocalDate, step: EmrStep) case object Waiting extends Exception def isRetriable(operation: String): Throwable => Boolean = { case Waiting => System.err.println(s"Retrying $operation") true case e: AmazonServiceException if RetryUtils.isRetryableServiceException(e) => System.err.println(s"Retriable AWS error (${e.getErrorCode}) while performing $operation", e) true case e => System.err.println(s"Error while performing $operation", e) false } // Implement here as the old version of cats-effect that's currently on classpath doesn't have it. def getOrElse[A](maybeA: Option[A], alt: => IO[A]): IO[A] = maybeA match { case Some(a) => IO.pure(a) case None => alt } }