Stateful Data Flow Beta Build composable event-driven data pipelines in minutes.

Get Started Now

Merge

Dataflows can merge events from multiple topics. The merge behavior is defined in the sources section of the service definition. The record type must match the schema of the target topic, otherwise use the map operator transform as required.

  sources:
    - type: topic
      id: <topic-id>
      transforms:
        - operator: <operator-name>
          ...
    - type: topic
      id: <topic-id>
      transforms:
        - operator: <operator-name>

The transforms section defines the transformation to apply to the event before sending it to the target topic. The operator type depends on the desired business logic.

 

Merge Example

We’ll create a dataflow that reads records from the child and adult topics and merges into the people topic.

 

1. Create a Dataflow file

Create a directory merge-test:

$ mkdir merge-test; cd merge-test

Add the following dataflow.yaml file:

# dataflow.yaml
apiVersion: 0.4.0

meta:
  name: merge
  version: 0.1.0
  namespace: examples

config:
  converter: json

types:
  person:
    type: object
    properties:
      name:
        type: string
      age: 
        type: i32

topics:
  child:
    schema:
      value:
        type: person
  adult:
    schema:
      value:
        type: person
  people:
    schema:
      value:
        type: person  

services:
  merge-service:
    sources:
      - type: topic
        id: child
      - type: topic
        id: adult
    sinks:
      - type: topic
        id: people

In this example, we don’t use any operators as the target schema matches the source. If there is a mismatch, use map and update the record accordingly.

$ sdf run

Add --ui if you want to see the graphical representation of the dataflow.

 

2. Test the Dataflow

Produce to adult:

$ fluvio produce adult
{"name":"Randy","age":32}
{"name":"Alice","age":28}

Produce to child:

$ fluvio produce child
{"age":16,"name":"Andrew"}
{"age":17,"name":"Jackson"}
{"age":15,"name":"Linda"}

Consume from people:

$ fluvio consume people -Bd
{"age":32,"name":"Randy"}
{"age":28,"name":"Alice"}
{"age":16,"name":"Andrew"}
{"age":17,"name":"Jackson"}
{"age":15,"name":"Linda"}

In summary, merge-service utilizes sources to merge the data from different topics: child and adult into a single topic people.

 

References