Created
February 14, 2015 00:14
-
-
Save shawnfeldman/ea4ca7e079cbf5cc86c3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| final Observable<DataMigration> migrations = Observable.from(migrationsToRun.values()).subscribeOn(Schedulers.io()); | |
| final Observable<Id> appIdObservable = applicationObservable.getAllApplicationIds(); | |
| final Observable<ApplicationScope> appScopeObservable = appIdObservable.map(new Func1<Id, ApplicationScope>() { | |
| @Override | |
| public ApplicationScope call(Id id) { | |
| ApplicationScope scope = new ApplicationScopeImpl(id); | |
| return scope; | |
| } | |
| }); | |
| Observable<ApplicationEntityGroup> entitiesObservable = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup>>() { | |
| @Override | |
| public Observable<ApplicationEntityGroup> call(ApplicationScope applicationScope) { | |
| return allEntitiesInSystemObservable.getAllEntitiesInSystem(appIdObservable, 1000); | |
| } | |
| }); | |
| Observable applications = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<?>>() { | |
| @Override | |
| public Observable<?> call(final ApplicationScope applicationScope) { | |
| return getMigrationObservables(migrations,DataMigration.MigrationType.Applications) | |
| .doOnNext(new Action1<DataMigration>() { | |
| @Override | |
| public void call(DataMigration dataMigration) { | |
| runMigration(dataMigration, observer, new ApplicationEntityGroup(applicationScope, null)); | |
| } | |
| }); | |
| } | |
| }); | |
| Observable entities = entitiesObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<?>>() { | |
| @Override | |
| public Observable<?> call(final ApplicationEntityGroup applicationEntityGroup) { | |
| return getMigrationObservables(migrations,DataMigration.MigrationType.Entities) | |
| .doOnNext(new Action1<DataMigration>() { | |
| @Override | |
| public void call(DataMigration dataMigration) { | |
| runMigration(dataMigration, observer, applicationEntityGroup); | |
| } | |
| }); | |
| } | |
| }); | |
| try { | |
| Observable | |
| .merge(applications, entities) | |
| .subscribeOn(Schedulers.io()) | |
| .toBlocking().lastOrDefault(null); | |
| migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status ); | |
| } catch (Exception e){ | |
| LOG.error("Migration Failed",e); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment