Last active
July 26, 2024 12:16
-
-
Save pryce-turner/0a67f86febdc812c9a2a9e739c22eeca to your computer and use it in GitHub Desktop.
Revisions
-
pryce-turner revised this gist
Jun 9, 2023 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -87,6 +87,9 @@ All this does is take in a string from the commandline, create the FlyteDirector That's all there is to it! Please feel free to comment if anything is unclear or could be improved. I hope you found this little "hella warld" demo useful. Wishing you tailwinds and blue skies on your Flyte journey! ## A Note on File Existence I've now run into an issue twice that's been a bit of a conundrum. In one instance, I was packaging a FlyteFile as part of a custom dataclass and instantiation of that dataclass would fail, saying the file didn't exist. In another instance, I found it *very strange* that I needed to run `os.listdir` before I could run `Path().rglob()` on a directory. Flyte will handle certain file operations on FlyteFiles for you, allowing you to interact with them as if they were on your local system, for example `open()` and `os.listdir`. However, behind the scenes, Flyte is pulling those files into the pods local storage before calling those methods. By default, those assets won't exist locally, so anything beyond very basic operations will fail and leave you scratching your head. The simple solution is to call `download()` on the FlyteFile or FlyteDirectory *before* doing anything fancy to pull those in from object storage. If you're getting FileNotFound errors when it definitely should be there, check this first. ## Full Code Here's what your complete `filebased_wf.py` should look like: -
pryce-turner revised this gist
Apr 16, 2023 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -24,7 +24,7 @@ Some of the abstractions however, do require a slight shift in assumptions compa from flytekit.types.directory import FlyteDirectory ``` ## Container Task First we'll define a generic [ContainerTask](https://docs.flyte.org/projects/cookbook/en/latest/auto/core/containerization/raw_container.html) that will handle the actual processing of the files. ``` ct = ContainerTask( -
pryce-turner created this gist
Apr 16, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,133 @@ # Processing Files with Dynamic Container Tasks in Flyte ## Motivation I'm still very new to Flyte but it's quickly becoming one of my favorite tools. Between the modern architectural decisions and a welcoming and extremely helpful community, what's not to love? Some of the abstractions however, do require a slight shift in assumptions compared to more localized workflow frameworks. Additionally, while the [documentation](https://docs.flyte.org/projects/cookbook/en/latest/userguide.html) is excellent at explaining Flyte's different features in isolation, there aren't as many posts on how they work together like there are for more established projects. This short piece is my attempt to capture my journey so far with a rather specific usecase that ties together a number of notable features. This is mostly to solidify my own understanding, but hopefully also a way to pay forward all the wonderful support I've received from the team. ## Setup - Everything you need to run the code below is captured in the [Environment Setup](https://docs.flyte.org/projects/cookbook/en/latest/userguide_setup.html) guide from the official docs - Upload some files to your local minio storage - open browser to http://localhost:30080/minio/browser - login with username `minio` and password `miniostorage` - make a path in my-s3-bucket called `input-data` - make some files with some content - `echo hello > hello.txt` - `echo world > world.txt` - upload both files to your new path - make a python file to contain our workflow e.g. `filebased_wf.py` and add the following imports to the top: ``` import os from typing import List from flytekit import kwtypes, workflow, dynamic, task, ContainerTask from flytekit.types.file import FlyteFile from flytekit.types.directory import FlyteDirectory ``` ## ContainerTask First we'll define a generic [ContainerTask](https://docs.flyte.org/projects/cookbook/en/latest/auto/core/containerization/raw_container.html) that will handle the actual processing of the files. ``` ct = ContainerTask( name="sed_task", input_data_dir="/var/inputs", output_data_dir="/var/outputs", inputs=kwtypes(infile=FlyteFile), outputs=kwtypes(out=FlyteFile), image="alpine", command=[ "/bin/sh", "-c", "sed 's/o/a/g' /var/inputs/infile > /var/outputs/out", ], ) ``` It's a very simple task that uses `sed` to convert any occurrence of `o` in a file to `a` and output the result to a new file. A few things to note: - We're just using the ubiquitous `alpine` image here as these raw container tasks are designed to use any old container, managing the dataflow and therefore DAG building via typed `inputs` and `outputs`. - When using `FlyteFile` as the input and output, the keyword variable name actually becomes the filename inside the container filesystem. If you try to use variable substitution via `{{.inputs.infile}}` like you would with other types, you'd run into something like the following error. The full path in the object storage is passed in, which the container obviously can't interpret: ``` sed: /var/inputs/s3://my-s3-bucket/data/82/f6da06ac4861f4dde89b-n1-0/b68e4787c8e70b0a6f64c04bada8b22a/hello.txt: No such file or directory ``` - We're using the [Shell Form](https://emmer.dev/blog/docker-shell-vs.-exec-form/) here, but Exec Form would also work, albeit without shell redirection via `>`. ## Directory Task Next up we have a small but crucially important task that gets the directory we made earlier ready for processing in other tasks. ``` @task def get_dir(dirpath: str) -> FlyteDirectory: fd = FlyteDirectory(path=dirpath) return fd ``` All this does is take a string representing the path to that directory, create, and return that directory as a proper `FlyteDirectory`. The reason this is so important is that FlyteFiles and FlyteDirectories get uploaded to and downloaded from the object store *at the task boundary*. What this means in practice is if you try to define a FlyteFile in a task body for example, and try opening it within that same context, the container running that task will have no idea how to open that file. ## Dynamic Task Since we may not know the content of our arbitrary directory at compile time, we need to rely on our handy [Dynamic Task](https://docs.flyte.org/projects/cookbook/en/latest/auto/core/control_flow/dynamics.html) to process the inputs at run time. ``` @dynamic def process_files(indir: FlyteDirectory) -> List[FlyteFile]: all_out = [] for fname in os.listdir(indir): f = os.path.join(indir, fname) file_out = ct(infile=f) all_out.append(file_out) return all_out ``` This task will take the FlyteDirectory we instantiated earlier and loop through it's contents, passing each file to our Container Task. The FlyteFiles returned are then added to a list of processed files and returned. ## Putting it all together Finally we have the workflow definition that coordinates all these parts. ``` @workflow def wf(indirpath: str) -> List[FlyteFile]: fdir = get_dir(dirpath=indirpath) outfiles = process_files(indir=fdir) return outfiles ``` All this does is take in a string from the commandline, create the FlyteDirectory, and pass it to the Dynamic Task. You can now invoke this workflow on your local sandbox cluster with: `pyflyte run --remote filebased_wf.py wf --indirpath s3://my-s3-bucket/input-data`. Head over to the Flyte Console at http://localhost:30080/console to track it's execution. Check out the inputs and outputs to the different tasks in the console and in minio. Go ahead and download one of those outputs to make sure our Container Task actually did what it's supposed to. That's all there is to it! Please feel free to comment if anything is unclear or could be improved. I hope you found this little "hella warld" demo useful. Wishing you tailwinds and blue skies on your Flyte journey! ## Full Code Here's what your complete `filebased_wf.py` should look like: ``` import os from typing import List from flytekit import kwtypes, workflow, dynamic, task, ContainerTask from flytekit.types.file import FlyteFile from flytekit.types.directory import FlyteDirectory ct = ContainerTask( name="sed_task", input_data_dir="/var/inputs", output_data_dir="/var/outputs", inputs=kwtypes(infile=FlyteFile), outputs=kwtypes(out=FlyteFile), image="alpine", command=[ "/bin/sh", "-c", "sed 's/o/a/g' /var/inputs/infile > /var/outputs/out", ], ) @task def get_dir(dirpath: str) -> FlyteDirectory: fd = FlyteDirectory(path=dirpath) return fd @dynamic def process_files(indir: FlyteDirectory) -> List[FlyteFile]: all_out = [] for fname in os.listdir(indir): f = os.path.join(indir, fname) file_out = ct(infile=f) all_out.append(file_out) return all_out @workflow def wf(indirpath: str) -> List[FlyteFile]: fdir = get_dir(dirpath=indirpath) outfiles = process_files(indir=fdir) return outfiles ```