The createTypedPipeline utility allows you to construct and execute a sequence of stages, where each stage performs a transformation or operation on input data, potentially resulting in a new output type. It supports both synchronous and asynchronous stages, making it suitable for a wide range of data processing tasks.
- Type Safety: Ensures the inputs and outputs of stages are correctly typed.
- Context Sharing: A shared context (
ctx) object is passed through stages, allowing for state sharing and error collection. - Flexible Error Handling: Configurable behavior to either halt execution on errors or continue, logging errors to the context for later handling.
To create a pipeline, simply call createTypedPipeline, optionally passing in configuration options:
Note: The input and output types refer to the types of the arguments to the first stage and the return type of the last stage, respectively. and should be defined as a tuple of types. For example, a pipeline that takes a single string input and returns a number would be defined as createTypedPipeline<[string], number>
const myPipeline = createTypedPipeline<[InputTypes], OutputType>({
name: "MyCustomPipeline",
throwOnError: false
});By default the name is set to "TypedPipeline" and throwOnError is set to true both are optional and can be omitted.
Stages are added to the pipeline using the addStage method. Each stage is a function that takes input and optionally a context object, returning an output:
myPipeline.addStage("stageName", (input, ctx) => {
// Stage logic here
return transformedInput;
});Execute the pipeline by calling it as a function with the initial input. Optionally, provide an initial context:
const finalResult = await myPipeline(initialInput, { customState: "start" });- Use Case: When stages need to share data or state (e.g., configuration, cumulative results).
- Pattern: Read from and write to the
ctxobject to pass data between stages.
myPipeline.addStage("collectData", (data, ctx) => {
ctx.collectedData = aggregateData(data);
return data;
});- Use Case: Determining whether the pipeline should halt on error or continue executing remaining stages.
- Pattern: Use the
throwOnErroroption when creating the pipeline.
const myPipeline = createTypedPipeline<[number, number], number>({ throwOnError: false });- Use Case: When subsequent stages need to react to errors from previous stages.
- Pattern: Check the
ctx.errorsarray at the beginning of stages to decide on error handling logic.
myPipeline.addStage("errorAwareStage", (input, ctx) => {
const error = ctx.errors.find(err => err.stage === "collectData");
if (error && error.error === "Data collection failed") {
// Use ctx.abort to halt the pipeline with a custom message
ctx.abort("Data collection failed, aborting pipeline.");
}
// Proceed with stage logic
});- Use Case: Pre-populating the shared context with necessary state before execution.
- Pattern: Provide an initial context object when executing the pipeline.
const result = await myPipeline(inputData, { initialValue: 100 });- Use Case: When stages require asynchronous operations (e.g., I/O, network requests).
- Pattern: Use
asyncfunctions andawaitfor asynchronous operations within stages.
myPipeline.addStage("asyncStage", async (input, ctx) => {
const result = await someAsyncOperation(input);
return result;
});- Explicitly Manage State: Use the context (
ctx) judiciously to share state and errors across stages, keeping stages decoupled. - Anticipate Errors: Design stages with error scenarios in mind, especially when opting to continue execution despite errors.
- Document Stage Contracts: Clearly document the expected inputs, outputs, and side effects (context modifications) of each stage to maintain clarity and ease of maintenance.
createTypedPipeline utility offers a powerful, flexible, and type-safe mechanism for constructing and executing data processing pipelines. By understanding and applying the recommended usage patterns, developers can effectively manage state sharing, error handling, and inter-stage communication, tailoring the pipeline to meet diverse processing needs.
This could be Improved in a few ways, First should probably allow for naming stages. Can then state the stage an error occurred, or potentially conditionally handle errors based on the stage (fallbacks for some stages?)