Skip to content

Instantly share code, notes, and snippets.

@Therosin
Last active March 17, 2024 16:10
Show Gist options
  • Select an option

  • Save Therosin/7ce6e0d99c29398de292847696de558a to your computer and use it in GitHub Desktop.

Select an option

Save Therosin/7ce6e0d99c29398de292847696de558a to your computer and use it in GitHub Desktop.
(terrible) "TypedPipeline" system for multi stage data pipelines in typescript.

Overview

The createTypedPipeline utility allows you to construct and execute a sequence of stages, where each stage performs a transformation or operation on input data, potentially resulting in a new output type. It supports both synchronous and asynchronous stages, making it suitable for a wide range of data processing tasks.

Key Features

  • Type Safety: Ensures the inputs and outputs of stages are correctly typed.
  • Context Sharing: A shared context (ctx) object is passed through stages, allowing for state sharing and error collection.
  • Flexible Error Handling: Configurable behavior to either halt execution on errors or continue, logging errors to the context for later handling.

Creating a Pipeline

To create a pipeline, simply call createTypedPipeline, optionally passing in configuration options: Note: The input and output types refer to the types of the arguments to the first stage and the return type of the last stage, respectively. and should be defined as a tuple of types. For example, a pipeline that takes a single string input and returns a number would be defined as createTypedPipeline<[string], number>

const myPipeline = createTypedPipeline<[InputTypes], OutputType>({ 
  name: "MyCustomPipeline", 
  throwOnError: false 
});

By default the name is set to "TypedPipeline" and throwOnError is set to true both are optional and can be omitted.

Adding Stages

Stages are added to the pipeline using the addStage method. Each stage is a function that takes input and optionally a context object, returning an output:

myPipeline.addStage("stageName", (input, ctx) => {
  // Stage logic here
  return transformedInput;
});

Executing the Pipeline

Execute the pipeline by calling it as a function with the initial input. Optionally, provide an initial context:

const finalResult = await myPipeline(initialInput, { customState: "start" });

Usage Patterns and Recommendations

1. Leveraging Context for State Sharing

  • Use Case: When stages need to share data or state (e.g., configuration, cumulative results).
  • Pattern: Read from and write to the ctx object to pass data between stages.
myPipeline.addStage("collectData", (data, ctx) => {
  ctx.collectedData = aggregateData(data);
  return data;
});

2. Configuring Error Handling Behavior

  • Use Case: Determining whether the pipeline should halt on error or continue executing remaining stages.
  • Pattern: Use the throwOnError option when creating the pipeline.
const myPipeline = createTypedPipeline<[number, number], number>({ throwOnError: false });

3. Handling Errors Within Stages

  • Use Case: When subsequent stages need to react to errors from previous stages.
  • Pattern: Check the ctx.errors array at the beginning of stages to decide on error handling logic.
myPipeline.addStage("errorAwareStage", (input, ctx) => {
  const error = ctx.errors.find(err => err.stage === "collectData");
  
  if (error && error.error === "Data collection failed") {
    // Use ctx.abort to halt the pipeline with a custom message
    ctx.abort("Data collection failed, aborting pipeline.");
  }
  // Proceed with stage logic
});

4. Initializing Pipeline with Context

  • Use Case: Pre-populating the shared context with necessary state before execution.
  • Pattern: Provide an initial context object when executing the pipeline.
const result = await myPipeline(inputData, { initialValue: 100 });

5. Asynchronous Stages

  • Use Case: When stages require asynchronous operations (e.g., I/O, network requests).
  • Pattern: Use async functions and await for asynchronous operations within stages.
myPipeline.addStage("asyncStage", async (input, ctx) => {
  const result = await someAsyncOperation(input);
  return result;
});

6. Best Practices

  • Explicitly Manage State: Use the context (ctx) judiciously to share state and errors across stages, keeping stages decoupled.
  • Anticipate Errors: Design stages with error scenarios in mind, especially when opting to continue execution despite errors.
  • Document Stage Contracts: Clearly document the expected inputs, outputs, and side effects (context modifications) of each stage to maintain clarity and ease of maintenance.

Conclusion

createTypedPipeline utility offers a powerful, flexible, and type-safe mechanism for constructing and executing data processing pipelines. By understanding and applying the recommended usage patterns, developers can effectively manage state sharing, error handling, and inter-stage communication, tailoring the pipeline to meet diverse processing needs.

/**
* Represents the shared context object that is passed to each stage in the pipeline.
*/
export type TypedPipelineCtx = {
errors: Array<{ stage: string; error: string }>;
[key: string]: any;
abort: (reason: string) => void;
};
type AsyncOrSync<T> = Promise<T> | T;
/**
* Represents a stage in a pipeline, where each stage transforms input of a certain type to an output of potentially another type.
* Can be either synchronous or asynchronous.
*
* @template Input The expected input type(s) for the stage. Can be a single type or a tuple of types for multiple arguments.
* @template Output The output type of the stage.
*/
export type TypedPipelineStage<Input, Output> = (
...args: Input extends any[] ? [...Input, TypedPipelineCtx?] : [Input, TypedPipelineCtx?]
) => AsyncOrSync<Output>;
/**
* Represents the options that can be provided when creating a new pipeline.
*/
export interface TypedPipelineOptions {
name?: string;
throwOnError?: boolean;
}
/**
* Represents an exception that can be thrown to halt the pipeline immediately.
*/
class PipelineAbortException extends Error {
constructor(message: string) {
super(message);
this.name = "PipelineAbortException";
}
}
/**
* Creates a new typed pipeline instance, allowing for stages to be added and the pipeline to be executed.
*
* @returns An object with methods to add stages to the pipeline and to execute the pipeline.
* @template I The input types for the first stage of the pipeline.
* @template O The output type of the last stage in the pipeline.
* @example
* ```typescript
* const myPipeline = createTypedPipeline<[number, number], number>(); // Create a pipeline that takes two numbers and returns a number
*
* myPipeline.addStage((a, b) => a + b); // Adding a stage to add the two numbers
*
* myPipeline.addStage(result => result * 2); // Adding a stage to multiply the result by 2
*
* const finalResult = await myPipeline(1, 2); // or await myPipeline.execute(1, 2) Execute the pipeline.
* console.log(finalResult); // Outputs: 6
* ```
*/
export function createTypedPipeline<I extends any[], O>(options?: TypedPipelineOptions) {
const stages: { name: string; stage: TypedPipelineStage<any, any> }[] = [];
const { name = "TypedPipeline", throwOnError = true } = options || {};
async function pipelineFunction(...args: [...I, Partial<TypedPipelineCtx>?]): Promise<O> {
// extract the context object if provided based on the signature and shape of the last argument
const lastArg = args[args.length - 1];
const hasInitialContext = typeof lastArg === "object" && lastArg !== null && "errors" in lastArg;
const initialContext = hasInitialContext ? (args.pop() as Partial<TypedPipelineCtx>) : {};
let currentArgs: any = args;
let context: TypedPipelineCtx = {
errors: [], ...initialContext, abort: (reason: string) => {
throw new PipelineAbortException(reason);
}
};
for (const { name: stageName, stage } of stages) {
try {
currentArgs = await stage(...(Array.isArray(currentArgs) ? [...currentArgs, context] : [currentArgs, context]));
} catch (err: any) {
if (err instanceof PipelineAbortException) {
console.error(`Pipeline "${name}" aborted in stage "${stageName}": ${err.message}`);
throw err; // Re-throw to halt the pipeline immediately
}
const errorInfo = { stage: stageName, error: err.toString() };
context.errors.push(errorInfo);
if (throwOnError) {
throw new Error(`TypedPipeline::${name} - Error occurred in stage: ${stageName}: ${err}`);
}
// Log non-abort errors when throwOnError is false
console.error(`TypedPipeline::${name} - Error occurred in stage: ${stageName} (throwOnError: false): ${err}`);
}
}
return currentArgs;
}
pipelineFunction.addStage = <S extends any[], T>(name: string, stage: TypedPipelineStage<S, T>): void => {
stages.push({ name, stage });
};
return pipelineFunction;
}
@Therosin
Copy link
Author

This could be Improved in a few ways, First should probably allow for naming stages. Can then state the stage an error occurred, or potentially conditionally handle errors based on the stage (fallbacks for some stages?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment