Skip to content

Instantly share code, notes, and snippets.

@ikhoon
Created February 5, 2021 07:39
Show Gist options
  • Select an option

  • Save ikhoon/cfbf9cb3fe7a7f6e651ffde02f8707c2 to your computer and use it in GitHub Desktop.

Select an option

Save ikhoon/cfbf9cb3fe7a7f6e651ffde02f8707c2 to your computer and use it in GitHub Desktop.

Revisions

  1. ikhoon revised this gist Feb 5, 2021. 1 changed file with 1 addition and 17 deletions.
    18 changes: 1 addition & 17 deletions BidiStream.java
    Original file line number Diff line number Diff line change
    @@ -1,20 +1,4 @@
    /*
    * Copyright 2021 LINE Corporation
    *
    * LINE Corporation licenses this file to you under the Apache License,
    * version 2.0 (the "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at:
    *
    * https://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    * License for the specific language governing permissions and limitations
    * under the License.
    */

    package example.armeria.server.annotated;
    package example.armeria;

    import java.nio.charset.StandardCharsets;
    import java.time.Duration;
  2. ikhoon created this gist Feb 5, 2021.
    106 changes: 106 additions & 0 deletions BidiStream.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,106 @@
    /*
    * Copyright 2021 LINE Corporation
    *
    * LINE Corporation licenses this file to you under the Apache License,
    * version 2.0 (the "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at:
    *
    * https://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    * License for the specific language governing permissions and limitations
    * under the License.
    */

    package example.armeria.server.annotated;

    import java.nio.charset.StandardCharsets;
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.List;

    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.extension.RegisterExtension;
    import org.reactivestreams.Publisher;

    import com.linecorp.armeria.client.WebClient;
    import com.linecorp.armeria.common.HttpData;
    import com.linecorp.armeria.common.HttpMethod;
    import com.linecorp.armeria.common.HttpRequest;
    import com.linecorp.armeria.common.HttpResponse;
    import com.linecorp.armeria.common.RequestHeaders;
    import com.linecorp.armeria.common.stream.HttpDecoder;
    import com.linecorp.armeria.common.stream.HttpDecoderInput;
    import com.linecorp.armeria.common.stream.HttpDecoderOutput;
    import com.linecorp.armeria.common.stream.StreamMessage;
    import com.linecorp.armeria.server.ServerBuilder;
    import com.linecorp.armeria.server.annotation.Post;
    import com.linecorp.armeria.server.annotation.ProducesOctetStream;
    import com.linecorp.armeria.testing.junit5.server.ServerExtension;

    import io.netty.buffer.ByteBuf;
    import reactor.core.publisher.Flux;

    class BidiStream {

    @RegisterExtension
    static ServerExtension server = new ServerExtension() {
    @Override
    protected void configure(ServerBuilder sb) throws Exception {
    sb.annotatedService(new Object() {

    @ProducesOctetStream
    @Post("/bidi")
    public Publisher<HttpData> bidi(HttpRequest req) {
    final StreamMessage<String> decode = req.decode(new FixedLengthDecoder(7));
    return Flux.from(decode)
    .map(message -> HttpData.ofUtf8("Hello, " + message));
    }
    });
    }
    };

    @Test
    void bidiTest() throws InterruptedException {
    final WebClient client = WebClient.of(server.httpUri());
    final Flux<HttpData> source = Flux.range(1, 100)
    .delayElements(Duration.ofSeconds(1))
    .map(i -> HttpData.ofUtf8("Armeria"));

    final HttpRequest request = HttpRequest.of(RequestHeaders.of(HttpMethod.POST, "/bidi"), source);
    final HttpResponse response = client.execute(request);
    Flux.from(response.decode(new FixedLengthDecoder(14)))
    .subscribe(System.out::println);

    // Wait for fully receiving a response stream
    Thread.sleep(20000);
    }

    private static final class FixedLengthDecoder implements HttpDecoder<String> {

    private final int length;
    private final List<ByteBuf> byteBufs = new ArrayList<>();

    private FixedLengthDecoder(int length) {
    this.length = length;
    }

    @Override
    public void process(HttpDecoderInput in, HttpDecoderOutput<String> out) {
    int remaining = in.readableBytes();
    if (remaining < length) {
    return;
    }

    while (remaining >= length) {
    final ByteBuf buf = in.readBytes(length);
    out.add(buf.toString(StandardCharsets.UTF_8));
    byteBufs.add(buf);
    buf.release();
    remaining -= length;
    }
    }
    }
    }