Skip to content

Instantly share code, notes, and snippets.

@shawnfeldman
Created February 14, 2015 00:14
Show Gist options
  • Select an option

  • Save shawnfeldman/ea4ca7e079cbf5cc86c3 to your computer and use it in GitHub Desktop.

Select an option

Save shawnfeldman/ea4ca7e079cbf5cc86c3 to your computer and use it in GitHub Desktop.
final Observable<DataMigration> migrations = Observable.from(migrationsToRun.values()).subscribeOn(Schedulers.io());
final Observable<Id> appIdObservable = applicationObservable.getAllApplicationIds();
final Observable<ApplicationScope> appScopeObservable = appIdObservable.map(new Func1<Id, ApplicationScope>() {
@Override
public ApplicationScope call(Id id) {
ApplicationScope scope = new ApplicationScopeImpl(id);
return scope;
}
});
Observable<ApplicationEntityGroup> entitiesObservable = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup>>() {
@Override
public Observable<ApplicationEntityGroup> call(ApplicationScope applicationScope) {
return allEntitiesInSystemObservable.getAllEntitiesInSystem(appIdObservable, 1000);
}
});
Observable applications = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<?>>() {
@Override
public Observable<?> call(final ApplicationScope applicationScope) {
return getMigrationObservables(migrations,DataMigration.MigrationType.Applications)
.doOnNext(new Action1<DataMigration>() {
@Override
public void call(DataMigration dataMigration) {
runMigration(dataMigration, observer, new ApplicationEntityGroup(applicationScope, null));
}
});
}
});
Observable entities = entitiesObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<?>>() {
@Override
public Observable<?> call(final ApplicationEntityGroup applicationEntityGroup) {
return getMigrationObservables(migrations,DataMigration.MigrationType.Entities)
.doOnNext(new Action1<DataMigration>() {
@Override
public void call(DataMigration dataMigration) {
runMigration(dataMigration, observer, applicationEntityGroup);
}
});
}
});
try {
Observable
.merge(applications, entities)
.subscribeOn(Schedulers.io())
.toBlocking().lastOrDefault(null);
migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
} catch (Exception e){
LOG.error("Migration Failed",e);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment