Skip to content

Instantly share code, notes, and snippets.

@shawnfeldman
Created February 13, 2015 23:13
Show Gist options
  • Select an option

  • Save shawnfeldman/18ab1d45d8ea2fac7f27 to your computer and use it in GitHub Desktop.

Select an option

Save shawnfeldman/18ab1d45d8ea2fac7f27 to your computer and use it in GitHub Desktop.
Observable<DataMigration> migrations = Observable.from(migrationsToRun.values()).subscribeOn(Schedulers.io());
final Observable appIdObservable = applicationObservable.getAllApplicationIds();
Observable entityMigrations = migrations
.filter(new Func1<DataMigration, Boolean>() {
@Override
public Boolean call(DataMigration dataMigration) {
return dataMigration.getType() == DataMigration.MigrationType.Entities;
}
}).flatMap(new Func1<DataMigration, Observable<ApplicationEntityGroup>>() {
@Override
public Observable<ApplicationEntityGroup> call(final DataMigration dataMigration) {
return allEntitiesInSystemObservable
.getAllEntitiesInSystem(appIdObservable, 1000)
.doOnNext(new Action1<ApplicationEntityGroup>() {
@Override
public void call(ApplicationEntityGroup entityGroup) {
runMigration(dataMigration,observer,entityGroup);
}
});
}
});
//migrations that aren't entities
Observable otherMigrations = migrations
.filter(new Func1<DataMigration, Boolean>() {
@Override
public Boolean call(DataMigration dataMigration) {
return dataMigration.getType() != DataMigration.MigrationType.Entities;
}
}).flatMap(new Func1<DataMigration, Observable<?>>() {
@Override
public Observable call(final DataMigration dataMigration) {
return appIdObservable.doOnNext(new Action1<Id>() {
@Override
public void call(Id id) {
ApplicationScope scope = new ApplicationScopeImpl(id);
runMigration(dataMigration, observer, new ApplicationEntityGroup(scope, null));
}
});
}
});
try {
Observable
.merge(entityMigrations, otherMigrations)
.subscribeOn(Schedulers.io())
.toBlocking().lastOrDefault(null);
migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
} catch (Exception e){
LOG.error("Migration Failed");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment