Created
May 21, 2020 00:57
-
-
Save daniellavoie/8736e040f2f2e4dfa1738264a6d60dab to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package dev.daniellavoie.bosh.client.webflux; | |
| import java.nio.file.Path; | |
| import java.time.Duration; | |
| import java.util.List; | |
| import javax.net.ssl.SSLException; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.http.HttpStatus; | |
| import org.springframework.http.MediaType; | |
| import org.springframework.http.client.reactive.ReactorClientHttpConnector; | |
| import org.springframework.http.codec.ClientCodecConfigurer; | |
| import org.springframework.http.codec.json.Jackson2JsonDecoder; | |
| import org.springframework.http.codec.json.Jackson2JsonEncoder; | |
| import org.springframework.web.reactive.function.client.ClientResponse; | |
| import org.springframework.web.reactive.function.client.ExchangeStrategies; | |
| import org.springframework.web.reactive.function.client.WebClient; | |
| import com.fasterxml.jackson.annotation.JsonInclude.Include; | |
| import com.fasterxml.jackson.core.type.TypeReference; | |
| import com.fasterxml.jackson.databind.ObjectMapper; | |
| import com.fasterxml.jackson.databind.PropertyNamingStrategy; | |
| import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; | |
| import dev.daniellavoie.bosh.client.api.UpdateConfigRequest; | |
| import dev.daniellavoie.bosh.client.api.UploadReleaseRequest; | |
| import dev.daniellavoie.bosh.client.api.UploadStemcellRequest; | |
| import dev.daniellavoie.bosh.client.model.Config; | |
| import dev.daniellavoie.bosh.client.model.Deployment; | |
| import dev.daniellavoie.bosh.client.model.Release; | |
| import dev.daniellavoie.bosh.client.model.Stemcell; | |
| import dev.daniellavoie.bosh.client.model.Task; | |
| import dev.daniellavoie.bosh.client.model.Task.State; | |
| import dev.daniellavoie.bosh.client.model.TaskEvent; | |
| import dev.daniellavoie.bosh.client.webflux.oauth2.ClientCredentialTokenRefreshFilter; | |
| import dev.daniellavoie.bosh.client.webflux.tls.DynamicX509TrustManager; | |
| import dev.daniellavoie.bosh.client.webflux.util.JacksonUtil; | |
| import io.netty.handler.ssl.SslContext; | |
| import io.netty.handler.ssl.SslContextBuilder; | |
| import reactor.core.publisher.Flux; | |
| import reactor.core.publisher.Mono; | |
| import reactor.netty.ByteBufFlux; | |
| import reactor.netty.http.client.HttpClient; | |
| public class BoshWebFluxClient { | |
| private static final Logger LOGGER = LoggerFactory.getLogger(BoshWebFluxClient.class); | |
| private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() | |
| .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE).setSerializationInclusion(Include.NON_NULL) | |
| .findAndRegisterModules(); | |
| private static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()) | |
| .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE).setSerializationInclusion(Include.NON_NULL) | |
| .findAndRegisterModules(); | |
| private final WebClient webClient; | |
| public BoshWebFluxClient(String environment, String clientId, String clientSecret, String tokenUri, byte[] boshCA) { | |
| var httpClientConnector = new ReactorClientHttpConnector( | |
| HttpClient.create().secure(t -> t.sslContext(createSSLContext(boshCA)))); | |
| webClient = WebClient.builder() | |
| .baseUrl(environment) | |
| .clientConnector(httpClientConnector) | |
| .filter(new ClientCredentialTokenRefreshFilter(clientId, clientSecret, tokenUri, httpClientConnector)) | |
| .exchangeStrategies(ExchangeStrategies.builder().codecs(this::acceptedCodecs).build()) | |
| .build(); | |
| } | |
| private void acceptedCodecs(ClientCodecConfigurer clientCodecConfigurer) { | |
| clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder(OBJECT_MAPPER, MediaType.TEXT_HTML)); | |
| clientCodecConfigurer.customCodecs() | |
| .register(new Jackson2JsonDecoder(OBJECT_MAPPER, MediaType.APPLICATION_JSON)); | |
| clientCodecConfigurer.customCodecs() | |
| .register(new Jackson2JsonEncoder(OBJECT_MAPPER, MediaType.APPLICATION_JSON)); | |
| clientCodecConfigurer.customCodecs() | |
| .register(new Jackson2JsonEncoder(YAML_MAPPER, new MediaType("text", "yaml"))); | |
| } | |
| private SslContext createSSLContext(byte[] boshCA) { | |
| try { | |
| return SslContextBuilder.forClient().trustManager(new DynamicX509TrustManager(boshCA)).build(); | |
| } catch (SSLException e) { | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| public Mono<Integer> deleteDeployment(String deployment) { | |
| return webClient.delete().uri("/deployments/{deployment}", deployment).exchange() | |
| .flatMap(response -> handleTaskResponse(response)); | |
| } | |
| public Mono<Integer> deploy(String manifest) { | |
| return webClient.post().uri("/deployments").contentType(new MediaType("text", "yaml")).bodyValue(manifest) | |
| .exchange().flatMap(response -> handleTaskResponse(response)); | |
| } | |
| public Mono<List<Config>> getConfigs() { | |
| return webClient.get().uri("/configs?latest=true").accept(MediaType.TEXT_HTML).exchange() | |
| .flatMap(response -> handleMonoResponse(response, String.class) | |
| .map(value -> JacksonUtil.read(value, new TypeReference<List<Config>>() { | |
| }))); | |
| } | |
| public Mono<Deployment> getDeployment(String name) { | |
| return webClient.get().uri("/deployments/{name}", name).exchange() | |
| .flatMap(response -> handleMonoResponse(response, Deployment.class)); | |
| } | |
| public Flux<Release> getReleases() { | |
| return webClient.get().uri("/releases").exchange().flatMapMany(response -> response.bodyToFlux(Release.class)); | |
| } | |
| public Flux<Stemcell> getStemcells() { | |
| return webClient.get().uri("/stemcells").exchange() | |
| .flatMapMany(response -> response.bodyToFlux(Stemcell.class)); | |
| } | |
| public Mono<Task> getTask(int id) { | |
| return webClient.get().uri("/tasks/{id}", id).exchange().flatMap(response -> response.bodyToMono(Task.class)); | |
| } | |
| public Flux<TaskEvent> getTaskEvents(int taskId) { | |
| var context = new TaskEventsContext(); | |
| return Flux.range(0, Integer.MAX_VALUE) | |
| .delayUntil(index -> index == 0 ? Mono.empty() : Mono.delay(Duration.ofSeconds(2))) | |
| .flatMap(index -> getTask(taskId)) | |
| .doOnNext(task -> context | |
| .setCompleted(task.getState().equals(State.done) || task.getState().equals(State.cancelled) | |
| || task.getState().equals(State.error) || task.getState().equals(State.timeout))) | |
| .flatMap(task -> getTaskEventsOutput(taskId)) | |
| .flatMap(eventsLines -> Flux.fromStream(eventsLines.lines())) | |
| .map(eventLine -> JacksonUtil.read(eventLine, TaskEvent.class)) | |
| .filter(taskEvent -> !context.contains(taskEvent)) | |
| .doOnNext(taskEvent -> context.add(taskEvent)) | |
| .takeUntil(taskEvent -> context.isCompleted()); | |
| } | |
| private Mono<String> getTaskEventsOutput(int id) { | |
| return webClient.get().uri("/tasks/{id}/output?type=event", id).exchange() | |
| .flatMap(response -> response.bodyToMono(String.class)); | |
| } | |
| public Mono<String> getTaskOutputDebug(int id) { | |
| return webClient.get().uri("/tasks/{id}/output?type=debug", id).exchange() | |
| .flatMap(response -> response.bodyToMono(String.class)); | |
| } | |
| public Flux<Task> getTaskUpdates(int taskId) { | |
| TaskUpdateContext taskUpdateContext = new TaskUpdateContext(); | |
| return Flux.range(0, Integer.MAX_VALUE) | |
| .delayUntil(index -> index == 0 ? Mono.empty() : Mono.delay(Duration.ofSeconds(5))) | |
| .flatMap(index -> getTask(taskId)) | |
| .filter(task -> !task.equals(taskUpdateContext.getLatestTask())) | |
| .doOnNext(task -> taskUpdateContext.setLatestTask(task)) | |
| .doOnNext(task -> taskUpdateContext | |
| .setCompleted(task.getState().equals(State.done) || task.getState().equals(State.cancelled) | |
| || task.getState().equals(State.error) || task.getState().equals(State.timeout))) | |
| .takeUntil(task -> taskUpdateContext.isCompleted()); | |
| } | |
| private <T> Mono<T> handleMonoResponse(ClientResponse response, Class<T> bodyType) { | |
| if (response.statusCode().equals(HttpStatus.NOT_FOUND)) { | |
| return response.releaseBody().then(Mono.empty()); | |
| } | |
| return response.bodyToMono(bodyType); | |
| } | |
| private Mono<Integer> handleTaskResponse(ClientResponse response) { | |
| String[] paths = response.headers().asHttpHeaders().getLocation().getPath().split("/"); | |
| return response.releaseBody().thenReturn(Integer.parseInt(paths[paths.length - 1])); | |
| } | |
| public Mono<Config> updateConfig(UpdateConfigRequest request) { | |
| return webClient.post().uri("/configs").bodyValue(request).exchange() | |
| .flatMap(response -> handleMonoResponse(response, String.class) | |
| .map(value -> JacksonUtil.read(value, Config.class))); | |
| } | |
| public Mono<Integer> uploadRelease(UploadReleaseRequest uploadReleaseRequest) { | |
| return webClient.post().uri("/releases") | |
| .bodyValue(uploadReleaseRequest).exchange() | |
| .doOnSubscribe(subscription -> LOGGER.info("Uploading {} to bosh director.", | |
| uploadReleaseRequest.getLocation())) | |
| .flatMap(response -> handleTaskResponse(response)) | |
| .doOnSuccess(task -> LOGGER.info("Successfully uploaded {} to bosh director.", uploadReleaseRequest)); | |
| } | |
| public Mono<Integer> uploadRelease(Path release) { | |
| return webClient.post().uri("/releases").body(ByteBufFlux.fromPath(release), ByteBufFlux.class).exchange() | |
| .flatMap(response -> handleTaskResponse(response)); | |
| } | |
| public Mono<Integer> uploadStemcell(UploadStemcellRequest uploadStemcellRequest) { | |
| return webClient.post().uri("/stemcells").bodyValue(uploadStemcellRequest).exchange() | |
| .doOnSubscribe(subscription -> LOGGER.info("Uploading {} to bosh director.", | |
| uploadStemcellRequest.getLocation())) | |
| .flatMap(response -> handleTaskResponse(response)) | |
| .doOnSuccess(task -> LOGGER.info("Successfully uploaded {} to bosh director.", uploadStemcellRequest)); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package dev.daniellavoie.bosh.client.webflux.tls; | |
| import java.io.ByteArrayInputStream; | |
| import java.io.IOException; | |
| import java.net.Socket; | |
| import java.security.KeyStore; | |
| import java.security.KeyStoreException; | |
| import java.security.NoSuchAlgorithmException; | |
| import java.security.cert.CertificateException; | |
| import java.security.cert.CertificateFactory; | |
| import java.security.cert.X509Certificate; | |
| import java.util.Arrays; | |
| import java.util.stream.Stream; | |
| import javax.net.ssl.SSLEngine; | |
| import javax.net.ssl.TrustManagerFactory; | |
| import javax.net.ssl.X509ExtendedTrustManager; | |
| import javax.net.ssl.X509TrustManager; | |
| public class DynamicX509TrustManager extends X509ExtendedTrustManager { | |
| private X509Certificate[] acceptedIssuers = null; | |
| private X509Certificate boshCA; | |
| private final X509TrustManager delegate; | |
| public DynamicX509TrustManager(byte[] boshCA) { | |
| try { | |
| var trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); | |
| X509Certificate ca = (X509Certificate) CertificateFactory.getInstance("X.509") | |
| .generateCertificate(new ByteArrayInputStream(boshCA)); | |
| KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); | |
| keystore.load(null, null); | |
| keystore.setCertificateEntry(Integer.toString(1), ca); | |
| trustManagerFactory.init(keystore); | |
| this.delegate = Arrays.stream(trustManagerFactory.getTrustManagers()) | |
| .filter(trustManager -> trustManager instanceof X509TrustManager).findFirst() | |
| .map(trustManager -> (X509TrustManager) trustManager) | |
| .orElseThrow(() -> new IllegalArgumentException("Could not find an X509TrustManager.")); | |
| } catch (NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e) { | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| @Override | |
| public void checkClientTrusted(X509Certificate[] chain, String s) throws CertificateException { | |
| delegate.checkClientTrusted(chain, s); | |
| } | |
| @Override | |
| public void checkClientTrusted(X509Certificate[] chain, String s, Socket socket) throws CertificateException { | |
| delegate.checkClientTrusted(chain, s); | |
| } | |
| @Override | |
| public void checkClientTrusted(X509Certificate[] chain, String s, SSLEngine sslEngine) throws CertificateException { | |
| delegate.checkClientTrusted(chain, s); | |
| } | |
| @Override | |
| public void checkServerTrusted(X509Certificate[] chain, String s) throws CertificateException { | |
| delegate.checkServerTrusted(chain, s); | |
| } | |
| @Override | |
| public void checkServerTrusted(X509Certificate[] chain, String s, Socket socket) throws CertificateException { | |
| delegate.checkServerTrusted(chain, s); | |
| } | |
| @Override | |
| public void checkServerTrusted(X509Certificate[] chain, String s, SSLEngine sslEngine) throws CertificateException { | |
| delegate.checkServerTrusted(chain, s); | |
| } | |
| @Override | |
| public X509Certificate[] getAcceptedIssuers() { | |
| if (acceptedIssuers == null) { | |
| acceptedIssuers = Stream.concat(Arrays.stream(delegate.getAcceptedIssuers()), Stream.of(boshCA)) | |
| .toArray(X509Certificate[]::new); | |
| } | |
| return acceptedIssuers; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment