Stateful Data Flow Beta Build composable event-driven data pipelines in minutes.

Get Started Now

Dataflow - Composition

Composable Dataflows allow you to create individual package and import them to create dataflows. They are a great way to organize and reuse your code in large dataflows. For simple dataflows, we recommend using Inline Dataflows.

Composition Diagram

 

Directory Structure

Dataflow and packages require a definition file for code generation and composition:

Consequently, each dataflow and package must have its dedicated directory.

For example, a dataflow named split-sentence and a package sentence:

split-sentence
├── dataflow.yaml
└── packages
    ├── sentence
    │   └── sdf-package.yaml

While placing the packages inside a dataflow is not strictly necessary, it often makes sense.

 

Packages

Packages are dataflow building blocks of one or more types, states, and functions. You can structure all the components in one package or divide them by responsibility across multiple packages. The packages also allow hierarchical imports, where a package can use other packages’ types, states, or functions.

 

sdf-package.yaml

The SDF command line tool uses the stateful dataflows package definition file sdf-package.yaml to generate the code and state and type definitions for the package. The package file has the following hierarchy:

# sdf-package.yaml
apiVersion: <version>

meta:
  name: <package-name>
  version: <package-version>
  namespace: <package-namespace>

imports:
  - pkg: <package-namespace>/<package-name>@<package-version>
    types:
      - name: <type-name>
    functions:
      - name: <function-name>
    states:
      - name: <state-name>

types:
  <type-name>: <type-props>

states:
  <state-name>: <state-props>

functions:
  <function-name>:
    operator: <operator-type>
    states: 
      - name: <state-name>
    inputs:
      - name: <input-name>
        type: <input-type>
    output:
      type: <output-type>

dev:
  converter: <converter-props>
  imports:
    - pkg:  <package-namespace>/<package-name>@<package-version>
      path: <relative-path>

The sections are as follows:

  • meta - metadata about the package
  • imports - imports other packages’ types, states, functions
  • types - defines types used in the package
  • states - defines states used in functions
  • functions - defines functions in the package
  • dev - defines substitutions used during development and test
 

Build and Test a Package

We’ll use the SDF command line tool with the package definition file to generate the WebAssembly glue code and the placeholder for the custom logic.

$ sdf -h    
Stateful Dataflow Command Line Interface

Usage: sdf <COMMAND>

Commands:
 clean     Clean generated artifacts such as state and internal objects
 build     Build package (requires: sdf-package.yaml)
 generate  Generate package (requires: sdf-package.yaml)
 update    Update package (requires: sdf-package.yaml)
 test      Test command shell (requires sdf-package.yaml)
 run       Run dataflow  (requires: dataflow.yaml)
 setup     Setup pre-requisites for Stateful Dataflows
 version   Prints binary version
 log       Print dataflow logs

Let’s start with an example. Create a package that split sentences into words and counts the number of characters in each word.

 
1. Create the Package file

Create a fresh project directory split-sentence with two subdirectories: packages and sentence:

$ mkdir -p split-sentence/packages/sentence
$ cd split-sentence/packages/sentence

Inside the sentence directory and create the sdf-package.yaml and add the following content:

#sdf-package.yanl
apiVersion: 0.4.0

meta:
  name: sentence-pkg
  version: 0.1.0
  namespace: example

functions:

  sentence-to-words:
    operator: flat-map
    inputs:
      - name: sentence
        type: string
    output:
      type: string
      
  augment-count:
    operator: map
    inputs:
      - name: word
        type: string
    output:
      type: string    

dev:
  converter: raw
 
2. Generate the Package Project

Use SDF generate command to build the project:

$ sdf generate

The generator created several directories and files that we’ll edit next.

 
3. Add the Custom Code

First, let’s update the first function, sentence-to-words. Open rust/sentence-to-words/src/lib.rs and update the function body with the following code:

pub(crate) fn sentence_to_words(sentence: String) -> Result<Vec<String>, String> {
    Ok(sentence.split_whitespace().map(String::from).collect())
}

Next update augment_count. Open rust/augment-count/src/lib.rs and replace the function body:

pub(crate) fn augment_count(word: String) -> Result<String, String> {
    Ok(format!("{}({})", word, word.chars().count()))
}

Let’s add some tests as well:

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn test_augment_count() {
        let input = "Hello".to_string();
        let output = augment_count(input);
        assert_eq!(output.unwrap(), "Hello(5)");
    }
}

We’ve implemented both functions; it’s time to compile and test our work.

 
4. Build and Test the Package

To build the package, run:

$ sdf build

Use sdf test interactive shell to test the code:

$ sdf test

In the test shell, you can view the functions available for testing:

>> show functions
sentence-to-words
augment-count

Let’s test sentence-to-words first:

>> test function sentence-to-words --input "Hello World"
Hello
World

Next, test augment-count:

>> test function augment-count --input "Hello"
Hello(5)

You may also test the rust code via Cargo:

$ cd rust
$ cargo test

The tests passed, and the package is now ready to use in the dataflow file.

 

The Dataflow

The dataflow imports functions, types, and states from one more package. Packages may also import components from others; however, dataflow maintains the final composition.

 

dataflow.yaml

The SDF command line tool uses the dataflow definition file dataflow.yaml to assemble the data application, and it has the following hierarchy:

# dataflow.yaml
apiVersion: <version>

meta:
  name: <dataflow-name>
  version: <dataflow-version>
  namespace: <dataflow-namespace>

imports:
  - pkg: <package-namespace>/<package-name>@<package-version>
    types:
      - name: <type-name>
    functions:
      - name: <function-name>
    states:
      - name: <state-name>

config:
  converter: <converter-props>
  consumer: <consumer-props>

types:
  <type-name>: <type-props>

topics:
  <topic-name>: <topic-props>

services:
  <service-name>:
    sources: 
      -type: <source-props>

    states:
      <state-name>: <state-props>

    transforms: 
      - operator: <operator-type>
        uses: <imported-function-name>

    sinks:
      - type: <sink-props>

dev:
  converter: <converter>
  imports:
    - pkg:  <package-namespace>/<package-name>@<package-version>
      path: <relative-path>

The sections are as follows:

  • meta - metadata about the package
  • imports - imports other packages’ types, states, functions
  • config - short for configurations, holds the default configurations.
  • types - defines types used in the package
  • topics - defines topics used in the package
  • states - defines states used in functions
  • services - defines the service properties
    • sources - defines the source topics used in the service
    • states - defines the state properties used in the service
    • transforms - defines the list of operators and the imported functions used in the service
    • sinks - defines the sink topics used in the service
  • dev - defines substitutions used during development and test

The dataflow file has other variants such as window and partitions, which are omitted for simplicity. For additional details check the Dataflow file section.

 

Create a Dataflow

We’ll import the package built in the previous section to create the split-sentence dataflow.

 
1. Create the Dataflow file

Ceate a dataflow file in the directory split-sentence directory:

$ cd split-sentence

Create the dataflow.yaml and add the following content:

#dataflow.yanl
apiVersion: 0.4.0

meta:
  name: split-sentence
  version: 0.1.0
  namespace: example

imports:
  - pkg: example/[email protected]
    functions:
      - name: sentence-to-words
      - name: augment-count

config:
  converter: raw

topics:
  sentence:
    schema:
      value:
        type: string
        converter: raw
  words:
    schema:
      value:
        type: string
        converter: raw

services:
  sentence-words:
    sources:
      - type: topic
        id: sentence

    transforms:
      - operator: flat-map
        uses: sentence-to-words
      - operator: map
        uses: augment-count

    sinks:
      - type: topic
        id: words

dev:
  imports:
    - pkg: example/[email protected]
      path: ./packages/sentence
 
2. Run the Dataflow

Use sdf command line tool to run the dataflow:

$ sdf run --dev 

Use --dev to ask the engine to change the path to the local package. Without this flag, the engine will look for the package in InfinyOn Hub.

 
3. Test the Dataflow
  1. Produce sentences to in sentence topic:
$ fluvio produce sentence
Hello world
Hi there

Consume from words to retrieve the result:

$ fluvio consume words -Bd
Hello(5)
world(5)
Hi(2)
there(5)
 
4. Show State

The dataflow collects runtime metrics that you can inspect in the runtime terminal.

Check the sentence-to-words counters:

>> show state sentence-words/sentence-to-words/metrics --table
 Key    Window  succeeded  failed
 stats  *       2          0

Check the augment-count counters:

>> show state sentence-words/augment-count/metrics --table
 Key    Window  succeeded  failed
 stats  *       4          0

Congratulations! You’ve successfully built and run a composable dataflow! The project is available for download in github.

 
5. Clean-up

Exit the sdf terminal and remove the topics:

fluvio topic delete sentence
fluvio topic delete words
 

References