Stateful Data Flow Beta Build composable event-driven data pipelines in minutes.

Get Started Now

States

States are persistent objects that facilitate the collection and retrieval of data generated by streams of records. collect data across streams of records. With states, dataflows can build tables, compute aggregates, join datasets, perform anomaly detections, etc.

In technical terms, states are materialized following CQRS architecture, where a builder service reads a stream of records and creates a materialized view for one or more readers. The builder service is separate from the reader services.

 

State Types

States are key/value types, defined as the keyword keyed-state, where:

  • key is the unique identifier for each object in the state. The key must be a string.
  • value can be primitive or arrow-row.

You should choose between primitive and arrow-row based on the data types and the operations you intend to perform on the state. Use primitive values to store nested objects for key-based look-ups, and arrow-row values to join datasets or perform SQL operations.

 

Primitive Value

The primitive values allows you to define objects using primitive types definitions. You may think if primitive states as key/value stores, such as mongodb. The advantages of this format is that it allows hierarchical objects. The following example shows a state object where the value is a 32 bit unsigned integer.

states:
  count-per-word:
    type: keyed-state
    properties:
      key:
          type: string
      value:
          type: primitive
          properties:
            value: i32

Note: In this beta release, the system only supports i32, with full support for hierachical objects coming soon.

 

Arrow-Row Value

The arrow-row values are defined using arrow types definitions. The arrow-row states are stored in arrow dataframe format. You may think of an arrow state as databases, such as postgress. The advantage of this format is that is accessible by third-party libraries such as Polars. An`arrow-row state is defined as follows:

states:
  temperature:
    type: keyed-state
    properties:
      key:
        type: string
      value:
        type: arrow-row
        properties:
          sensor:
            type: string
          temperature:
            type: f32 

Check out the types section for addition information on arrow types.

 

State Definition & Usage

The state objects are defined in the states section of an sdf-package.yaml file, or inside a service section in the dataflow.yaml file. States defined in a package file are imported in a dataflow file.

 

Inline Definitions (dataflow.yaml)

Inline states are defined in a dataflow.yaml file. We’ll use an example to show how to use define and use states in a dataflow file.

In most cases, a state processing operation has 3-steps in 2 different services:

 

Step 1: Define State

In this example, we define a count-words service and a count-per-word state. As this service defines the state, it is also responsible for updating it.

services:
  count-words:
    states:
      count-per-word:
        type: keyed-state
        properties:
          key:
            type: string
          value:
            type: arrow-row
            properties:
              count:
                type: i32

Note, that state type could have been defined in the types section.

 

Step 2: Update State

Service responsible for updating the state object must have an update-state operator. This is where the state is updated.

update-state:
  run: |
    fn increment_word_count(word: String) -> Result<(), String> {
      let mut count = count_per_word();
      count.value += 1;
      count.update();
      Ok(())
    }    

In this example we look-up the previous count, increment it and update it.

Except for window processing where the service concludes with a flush operation, the update_state operation is the last operation of the service.

 

Step 3: Lookup State

Next we’ll define a service that listens to a data stream of words and returns their count. This service is a separate flow triggered by another data stream.

In this example, we define a service lookup-word that receives words from a data stream, looks-up each word in count-per-word state, and converts the result into a json value.

lookup-word:
  states:
    count-per-word:
      from: count-words.count-per-word

  transforms:
    - operator: map
      run: |
        fn query_word_count(word: String) -> Result<WordCount, String> {
          use polars::prelude::{col,lit,IntoLazy};

          let df = count_per_word();
          let val = df.clone().lazy()
              .filter(col("_key").eq(lit(word.clone())))
              .collect().expect("parse");

          if let Some(count) = val.column("count").unwrap().i32().unwrap().get(0) {
            Ok(WordCount{word, count})
          } else {
            Ok(WordCount{word, count: 0})
          }
        }        

When a service accesses a state, it needs to define the target location, which in our example is count-words. Use an operator map to match the word from the stream with a value from count_per_word. We use Polars to perform the query operation. If the value exists, the result is the word and associated count, otherwise zero.

The look-up sends the result to the next stage of data data pipeline.

 

States & Packages

State can be defined in a package sdf-package.yaml file. The apporach allows you to implement and test states independently from the dataflow.yaml file.

We’ll use an example to show how to implement, test, and import a state from a package file.