Skip to content

Instantly share code, notes, and snippets.

@aaronj1335
Last active November 24, 2021 22:53
Show Gist options
  • Select an option

  • Save aaronj1335/da1c1795c7678dfc7ef5de96a3ae0a59 to your computer and use it in GitHub Desktop.

Select an option

Save aaronj1335/da1c1795c7678dfc7ef5de96a3ae0a59 to your computer and use it in GitHub Desktop.

Revisions

  1. aaronj1335 revised this gist Nov 24, 2021. 1 changed file with 47 additions and 9 deletions.
    56 changes: 47 additions & 9 deletions ServerForTest.kt
    Original file line number Diff line number Diff line change
    @@ -1,16 +1,41 @@
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import java.io.PrintWriter
    import java.net.HttpURLConnection
    import java.net.ServerSocket
    import java.net.URL
    import java.nio.CharBuffer
    import java.util.concurrent.Executors

    data class Request(val line: String, val headers: List<String>, val body: String)
    data class Response(val body: String, val code: Int = 200) {
    val message = if (code == 200) "ok" else "uh oh"
    }

    fun CoroutineScope.launchServer(handler: (Request) -> Response): Flow<Pair<Request, Response>> = flow {
    val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    launch(dispatcher) {
    ServerSocket(8080).use { serverSocket ->
    while (true) {
    val client = serverSocket.accept()
    launch(Executors.newFixedThreadPool(4).asCoroutineDispatcher()) {
    val request = // ... read request line, headers, and body, and create Request ...
    val input = client.getInputStream().bufferedReader()
    val output = PrintWriter(client.getOutputStream())
    val requestLine = input.readLine()
    val headers = input.lineSequence().takeWhile { it.isNotBlank() }.toList()
    val buffer = CharBuffer.allocate(8192)
    input.read(buffer)
    buffer.flip()
    val body = buffer.toString()
    val request = Request(requestLine, headers, body)
    val response = handler(request)
    output.print("HTTP/1.0 ${response.code} ${response.message}\n\n${response.body}")
    output.flush()
    client.close()

    launch(dispatcher) {
    emit(Pair(request, response))
    }
    // write output and close socket
    }
    }
    }
    @@ -21,11 +46,17 @@ fun CoroutineScope.launchAsyncRequest(body: String, wait: Long = 1000) = launch(
    delay(wait)
    try {
    (URL("http://localhost:8080").openConnection() as HttpURLConnection).apply {
    // ... Write request body, read result code and body ...
    requestMethod = "GET"
    doOutput = true
    val writer = outputStream.bufferedWriter()
    writer.write(body)
    writer.flush()
    val code = responseCode
    val responseBody = inputStream.bufferedReader().readText()
    println("$body result [$responseCode]: $responseBody")
    }
    } catch (t: Throwable) {
    // ... swallow exception ...
    println("failed to make <$body> request: $t")
    }
    }

    @@ -36,7 +67,7 @@ fun main(args: Array<String>) = runBlocking {
    } else {
    Response("Tid${Thread.currentThread().id}(${it.body})")
    }
    }
    }.shareIn(this, SharingStarted.Eagerly)

    launchAsyncRequest("abc", 1000)
    launchAsyncRequest("def", 2000)
    @@ -45,8 +76,15 @@ fun main(args: Array<String>) = runBlocking {
    launchAsyncRequest("mno", 4000)

    // I want this to:
    // 1. Block until it sees "ghi", then continue
    // 2. After continuing cancel the job associated with the server (thereby closing the socket)
    requests.takeWhile { (_, response) -> !response.body.contains("ghi") }.collect()
    Unit
    //
    // 1. Suspend while collecting each emitted value.
    // 2. Continue to collect until seeing a request with "ghi".
    // 3. At that point continue.
    // 4. Cancel the flow (thereby closing the server's socket).
    //
    // But instead it suspends, I don't see any of the "collected" logs, and it never resumes or cancels the flow.
    requests
    .takeWhile { (_, response) -> !response.body.contains("ghi") }
    .onEach { System.err.println("collected $it") }
    .collect()
    }
  2. aaronj1335 revised this gist Nov 24, 2021. 1 changed file with 8 additions and 35 deletions.
    43 changes: 8 additions & 35 deletions ServerForTest.kt
    Original file line number Diff line number Diff line change
    @@ -1,40 +1,16 @@
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import java.io.PrintWriter
    import java.net.HttpURLConnection
    import java.net.ServerSocket
    import java.net.URL
    import java.nio.CharBuffer
    import java.util.concurrent.Executors

    data class Request(val line: String, val headers: List<String>, val body: String)
    data class Response(val body: String, val code: Int = 200) {
    val message = if (code == 200) "ok" else "uh oh"
    }

    fun CoroutineScope.launchServer(handler: (Request) -> Response): Flow<Pair<Request, Response>> = flow {
    val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    launch(dispatcher) {
    ServerSocket(8080).use { serverSocket ->
    while (true) {
    val client = serverSocket.accept()
    launch(Executors.newFixedThreadPool(4).asCoroutineDispatcher()) {
    val input = client.getInputStream().bufferedReader()
    val output = PrintWriter(client.getOutputStream())
    val requestLine = input.readLine()
    val headers = input.lineSequence().takeWhile { it.isNotBlank() }.toList()
    val buffer = CharBuffer.allocate(8192)
    input.read(buffer)
    buffer.flip()
    val body = buffer.toString()
    val request = Request(requestLine, headers, body)
    val request = // ... read request line, headers, and body, and create Request ...
    val response = handler(request)
    launch(dispatcher) {
    emit(Pair(request, response))
    }
    output.print("HTTP/1.0 ${response.code} ${response.message}\n\n${response.body}")
    output.flush()
    client.close()
    // write output and close socket
    }
    }
    }
    @@ -45,17 +21,11 @@ fun CoroutineScope.launchAsyncRequest(body: String, wait: Long = 1000) = launch(
    delay(wait)
    try {
    (URL("http://localhost:8080").openConnection() as HttpURLConnection).apply {
    requestMethod = "GET"
    doOutput = true
    val writer = outputStream.bufferedWriter()
    writer.write(body)
    writer.flush()
    val code = responseCode
    val responseBody = inputStream.bufferedReader().readText()
    // ... Write request body, read result code and body ...
    println("$body result [$responseCode]: $responseBody")
    }
    } catch (t: Throwable) {
    println("failed to make <$body> request: $t")
    // ... swallow exception ...
    }
    }

    @@ -66,14 +36,17 @@ fun main(args: Array<String>) = runBlocking {
    } else {
    Response("Tid${Thread.currentThread().id}(${it.body})")
    }
    }.shareIn(this, SharingStarted.Eagerly)
    }

    launchAsyncRequest("abc", 1000)
    launchAsyncRequest("def", 2000)
    launchAsyncRequest("ghi", 3000)
    launchAsyncRequest("jkl", 3000)
    launchAsyncRequest("mno", 4000)

    // I want this to:
    // 1. Block until it sees "ghi", then continue
    // 2. After continuing cancel the job associated with the server (thereby closing the socket)
    requests.takeWhile { (_, response) -> !response.body.contains("ghi") }.collect()
    Unit
    }
  3. aaronj1335 created this gist Nov 24, 2021.
    79 changes: 79 additions & 0 deletions ServerForTest.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,79 @@
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import java.io.PrintWriter
    import java.net.HttpURLConnection
    import java.net.ServerSocket
    import java.net.URL
    import java.nio.CharBuffer
    import java.util.concurrent.Executors

    data class Request(val line: String, val headers: List<String>, val body: String)
    data class Response(val body: String, val code: Int = 200) {
    val message = if (code == 200) "ok" else "uh oh"
    }

    fun CoroutineScope.launchServer(handler: (Request) -> Response): Flow<Pair<Request, Response>> = flow {
    val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    launch(dispatcher) {
    ServerSocket(8080).use { serverSocket ->
    while (true) {
    val client = serverSocket.accept()
    launch(Executors.newFixedThreadPool(4).asCoroutineDispatcher()) {
    val input = client.getInputStream().bufferedReader()
    val output = PrintWriter(client.getOutputStream())
    val requestLine = input.readLine()
    val headers = input.lineSequence().takeWhile { it.isNotBlank() }.toList()
    val buffer = CharBuffer.allocate(8192)
    input.read(buffer)
    buffer.flip()
    val body = buffer.toString()
    val request = Request(requestLine, headers, body)
    val response = handler(request)
    launch(dispatcher) {
    emit(Pair(request, response))
    }
    output.print("HTTP/1.0 ${response.code} ${response.message}\n\n${response.body}")
    output.flush()
    client.close()
    }
    }
    }
    }
    }

    fun CoroutineScope.launchAsyncRequest(body: String, wait: Long = 1000) = launch(Dispatchers.IO) {
    delay(wait)
    try {
    (URL("http://localhost:8080").openConnection() as HttpURLConnection).apply {
    requestMethod = "GET"
    doOutput = true
    val writer = outputStream.bufferedWriter()
    writer.write(body)
    writer.flush()
    val code = responseCode
    val responseBody = inputStream.bufferedReader().readText()
    println("$body result [$responseCode]: $responseBody")
    }
    } catch (t: Throwable) {
    println("failed to make <$body> request: $t")
    }
    }

    fun main(args: Array<String>) = runBlocking {
    val requests = launchServer {
    if (it.body.contains("ghi")) {
    Response("Tid${Thread.currentThread().id}(${it.body}) GOT IT")
    } else {
    Response("Tid${Thread.currentThread().id}(${it.body})")
    }
    }.shareIn(this, SharingStarted.Eagerly)

    launchAsyncRequest("abc", 1000)
    launchAsyncRequest("def", 2000)
    launchAsyncRequest("ghi", 3000)
    launchAsyncRequest("jkl", 3000)
    launchAsyncRequest("mno", 4000)

    requests.takeWhile { (_, response) -> !response.body.contains("ghi") }.collect()
    Unit
    }