Stateful Data Flow Beta Build composable event-driven data pipelines in minutes.

Get Started Now

Key-Value Types

Fluvio data streaming allows you to define key-value streams, where the key is used for partitioning. The engine makes the keys accessible to the SDF function by passing them to all functions in a chain of operations. In addition, the engine can also modify the keys from one stream to another.

Let’s start with a simple example and work through various use cases.

 

Functions with Value (no Key)

In the most common case, an SDF function changes the value of a and ignores the key. The following example shows a function that reads a coupon value and generates a credit value.

functions:
  coupon-to-credit:
    operator: map
    inputs:
      - name: coupon
        type: coupon
    output:
      type: credit

The function definition generates the following code:

fn coupon_to_credit(coupon: Coupon) -> Result<Credit, String> {
  ...
}

By default, SDF maps the value portion of the record to the first parameter, Coupon, and the result, Credit, to the output value. This function does not know about the key.

 

Functions with Input Key and Value

Sometimes, the data streaming record has a key that the SDF function needs to access to apply the business logic. In the following example, the coupon value has a user-id in the record key. The function uses the key and the value fields to validate the coupon and generates a credit value.

Let’s define the function prototype:

functions:
  coupon-to-user-credit:
    operator: map
    inputs:
      - name: user-id
        type: string
        kind: key
      - name: coupon
        type: coupon
    output:
      type: credit

The function definition generates the following code:

fn coupon_to_user_credit(user_id: Option<String>, coupon: Coupon) -> Result<Credit, String> {
  ...
}

The engine generates the user-id key as optional parameter, as there is no guarantee that all records in a stream include a key. The developer is responsible for ensuring the key’s presence as it applies the business logic.

 

Functions with Output Key-Value

The SDF engine can also remap the key between two data streams. In the following example the function reads the user-id key and a coupon value, and generates a new key and value pair. The value stores the credit and the new key the timestamp indicating the credit expiration.

functions:
  coupon-to-ts-credit:
    operator: map
    inputs:
      - name: user-id
        type: string
        kind: key
      - name: coupon
        type: coupon
    output:
      type: key-value
      properties:
        key: 
            type: expires
        value:
            type: credit

The function definition generates the following code:

pub(crate) fn coupon_to_ts_credit(user_id: Option<String> coupon: Coupon,) -> Result<(Option<Expires>, Credit), String> {
  ...
}

The engine generates a (key, value) tuple to pass along the key and value to the subsequent function or the target topic.

 

Key Chaining and Multi-Step Operations

The operators in the transforms section may be chained to perform multi-step operations. The engine uses the following algorithm to pass down keys from one operator function to another.

  • A function may read or ignore the incoming key.
  • If the function returns a new key, the following function receives the new key.
  • If any function in the middle of the chain returns a value (without a key), the chain is not broken; the following function reading the key will receive the last updated key.
 

Key Operators and Convention

All operators below can access the key, but only a subset can return a new key along with the value. During chained operations, if the key is omitted from the return statement, the key in the original record is passed along. If the key is modified along the path, the new is passed to the following function.

Operators that can access key and return value or (key, value):

Operators that can access the key but cannot return (key, value):

 

Limitations

  • The key is using raw serialization, regardless of the configuration of the value serialization.
 

References