Created
February 11, 2025 17:13
-
-
Save victoire-hergan/04110c88bfbf7c41dc5ec12a9a436969 to your computer and use it in GitHub Desktop.
Stream response from a deployed flow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| static const String _flowStreamDelimiter = '\n\n'; | |
| static StreamFlowResponse<O, S> streamFlow<O, S>({ | |
| required String url, | |
| dynamic input, | |
| Map<String, String>? headers, | |
| }) { | |
| // Create a StreamController to manage the stream of messages | |
| final controller = StreamController<S>(); | |
| // Create a Completer to manage the final output | |
| final completer = Completer<O>(); | |
| // Prepare headers | |
| final requestHeaders = { | |
| 'Accept': 'text/event-stream', | |
| 'Content-Type': 'application/json', | |
| if (headers != null) ...headers, | |
| }; | |
| // Start the stream processing | |
| _processStream( | |
| url: url, | |
| input: input, | |
| headers: requestHeaders, | |
| onMessage: (message) => controller.add(message as S), | |
| onResult: (result) { | |
| completer.complete(result as O); | |
| controller.close(); | |
| }, | |
| onError: (error) { | |
| controller.addError(error); | |
| completer.completeError(error); | |
| controller.close(); | |
| }, | |
| ); | |
| return StreamFlowResponse( | |
| output: completer.future, | |
| stream: controller.stream, | |
| ); | |
| } | |
| static Future<void> _processStream({ | |
| required String url, | |
| required dynamic input, | |
| required Map<String, String> headers, | |
| required Function(dynamic) onMessage, | |
| required Function(dynamic) onResult, | |
| required Function(dynamic) onError, | |
| }) async { | |
| try { | |
| final request = http.Request('POST', Uri.parse(url)); | |
| request.headers.addAll(headers); | |
| request.body = jsonEncode({'data': input}); | |
| final streamedResponse = await http.Client().send(request); | |
| if (streamedResponse.statusCode != 200) { | |
| throw Exception('Server returned: ${streamedResponse.statusCode}'); | |
| } | |
| String buffer = ''; | |
| await for (final chunk in streamedResponse.stream.transform(utf8.decoder)) { | |
| buffer += chunk; | |
| while (buffer.contains(_flowStreamDelimiter)) { | |
| final messageEnd = buffer.indexOf(_flowStreamDelimiter); | |
| final message = buffer.substring(0, messageEnd); | |
| buffer = buffer.substring(messageEnd + _flowStreamDelimiter.length); | |
| if (message.isEmpty) continue; | |
| final data = message.startsWith('data: ') ? message.substring(6) : message; | |
| try { | |
| final chunk = jsonDecode(data); | |
| //print(chunk); | |
| if (chunk.containsKey('message')) { | |
| //print(chunk['message']); | |
| onMessage(chunk['message']); | |
| } else if (chunk.containsKey('result')) { | |
| //print(chunk['result']); | |
| onResult(chunk['result']); | |
| return; | |
| } else if (chunk.containsKey('error')) { | |
| final error = chunk['error']; | |
| throw Exception( | |
| '${error['status']}: ${error['message']}\n${error['details']}', | |
| ); | |
| } else { | |
| throw Exception('Unknown chunk format: $chunk'); | |
| } | |
| } catch (e) { | |
| onError(e); | |
| return; | |
| } | |
| } | |
| } | |
| if (buffer.isNotEmpty) { | |
| onError(Exception('Stream ended with partial data: $buffer')); | |
| } | |
| } catch (e) { | |
| onError(e); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment