Build robust event-driven data pipelines for
Lightweight steam processing platform written in Rust and extended by WASM for unmatched speed and flexibility.
apiVersion: 0.1.0
meta:
version: 0.2.3
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/+/+/+/#"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/[email protected]
with:
spec:
- operation: shift
spec:
payload:
VP:
veh: "vehicle"
spd: "speed"
- uses: infinyon/[email protected]
with:
mapping:
table: "speed"
map-columns:
"lat":
"vehicle":
json-key: "vehicle"
value:
type: "int"
required: true
"speed":
json-key: "speed"
value:
type: "float"
required: true
version: 0.1.0
meta:
name: dedup-bus-events
deduplication:
bounds:
count: 1000000
age: 24h
filter:
transform:
uses: infinyon-labs/[email protected]
apiVersion: 0.1.0
meta:
name: avg-vehicle-last-5-minutes
materialize:
window:
type: tumbling
bound: "5min"
compute:
uses: "infinyon-labs/[email protected]"
with:
group_by:
- vehicle_id
operation:
type: average
param: speed
result: avg_speed
apiVersion: 0.1.0
meta:
version: 0.1.0
name: md-helsinki
type: duckdb-sink
topic: helsinki
secrets:
- name: MD_TOKEN
duckdb:
url: "md:?token=${{ secrets.MD_TOKEN }}"
apiVersion: 0.1.0
meta:
version: 0.2.3
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/+/+/+/#"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/[email protected]
with:
spec:
- operation: shift
spec:
payload:
VP:
veh: "vehicle"
spd: "speed"
- uses: infinyon/[email protected]
with:
mapping:
table: "speed"
map-columns:
"lat":
"vehicle":
json-key: "vehicle"
value:
type: "int"
required: true
"speed":
json-key: "speed"
value:
type: "float"
required: true
version: 0.1.0
meta:
name: dedup-bus-events
deduplication:
bounds:
count: 1000000
age: 24h
filter:
transform:
uses: infinyon-labs/[email protected]
apiVersion: 0.1.0
meta:
name: avg-vehicle-last-5-minutes
materialize:
window:
type: tumbling
bound: "5min"
compute:
uses: "infinyon-labs/[email protected]"
with:
group_by:
- vehicle_id
operation:
type: average
param: speed
result: avg_speed
apiVersion: 0.1.0
meta:
version: 0.1.0
name: md-helsinki
type: duckdb-sink
topic: helsinki
secrets:
- name: MD_TOKEN
duckdb:
url: "md:?token=${{ secrets.MD_TOKEN }}"
Why you need a unified data platform?
-
Reduce
Time and ComplexityTired of babysitting a bunch of tools and infrastructure to build basic data pipelines? We are as well!
-
Improve
Development ExperienceWork on the data insights instead of endlessly debugging failed jobs and lost data.
-
Deliver
Continuous Data FlowsDeliver data flows with state management, delivery guarantees, and high quality data continuously.
What's the state of your data pipelines?
Build reliable data flows
Simplify your data stack
Technology Leaders Experience
By adopting InfinyOn Cloud, we've transformed our engineering velocity and eliminated the roadblocks and delays associated with traditional systems and services, empowering our teams to focus on innovation and growth.

Building blocks of reliable data pipelines
Collect your data with built-in connectors, webhooks, and IoT collectors, or build your own custom clients.
apiVersion: 0.1.0
meta:
version: 0.2.4
name: http-github
type: http-source
topic: github-events
secrets:
- name: GITHUB_TOKEN
http:
endpoint: 'https://api.github.com/repos/infinyon/fluvio'
method: GET
headers:
- 'Authorization: token ${{ secrets.GITHUB_TOKEN }}'
interval: 30s
apiVersion: 0.1.0
meta:
version: 0.2.4
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/+/+/+/#"
timeout:
secs: 30
nanos: 0
payload_output_type: json
apiVersion: 0.1.0
meta:
name: my-stripe-webhook
topic: payment_confirmations
secrets:
- name: stripe_secret_key
webhook:
verification:
- uses: infinyon/[email protected]
with:
webhook_key: ${{ stripe_secret_key }}
## Cloud Collector
$ fluvio topic create collector-topic --mirror
$ fluvio topic collector-topic --mirror-add 6001
$ fluvio cloud register cluster --id 6001 --source-ip 35:42:12:192
$ fluvio cloud cluster metadata export --topic local-topic \
--mirror-spu 6001 --file remote-6001.toml
## IoT Sensor (ARMv7, ARM64, etc.)
$ fluvio cluster start --local --config remote-6001.toml
## Produce at Edge
$ fluvio produce local-topic
> test from 6001
## Read from Cloud
$ fluvio consume collector-topic -B
test from 6001
use fluvio::{Fluvio, RecordKey};
#[async_std::main]
async fn main() {
let topic = "rust-topic";
let record = "Hello from rust!";
let fluvio = Fluvio::connect().await.unwrap();
let producer = fluvio::producer(topic).await.unwrap();
producer.send(RecordKey::NULL, record).await.unwrap();
producer.flush().await.unwrap();
}
#!/usr/bin/env python
from fluvio import Fluvio
if __name__ == "__main__":
topic = "python-topic";
record = "Hello from rust!";
fluvio = Fluvio.connect()
producer = fluvio.topic_producer(topic)
producer.send_string(record)
producer.flush()
import Fluvio from "@fluvio/client";
const produce = async () => {
const TOPIC_NAME = "node-topic";
const RECORD_TEXT = "Hello from node!"
await fluvio.connect();
const producer = await fluvio.topicProducer(TOPIC_NAME);
await producer.send(RECORD_TEXT);
};
const fluvio = new Fluvio();
produce();
Coming soon ...
Apply transformations in-line as your data traverses the pipelines. Use pre-built SmartModules from the Hub or build your own with SMDK.
# Filter out all records that fail to match `Cat` or `cat`
transforms:
- uses: infinyon/[email protected]
with:
regex: "[Cc]at"
# JOLT: JSON Language for Transformations
transforms:
- uses: infinyon/[email protected]
with:
spec:
- operation: remove
spec:
id: ""
- operation: shift
spec:
"*": "data.&0"
- operation: default
spec:
data:
source: "http-connector"
# Tranform JSON records into SQL statements (used with sql-sink)
transforms:
- uses: infinyon/[email protected]
with:
mapping:
table: "target_table"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
# Look at last 10 records.
# Filter out based based on your custom business logic.
transforms:
- uses: infinyon/[email protected]
lookback:
last: 10
## Use smartmodule developer kit (smdk) to build your own.
$ fluvio install smdk
## Create your smarmodule project
$ smdk generate
🤷 Project Name: my-smartmodule
🤷 Which type of SmartModule would you like?
❯ filter
map
filter-map
array-map
aggregate
## Build & Test
$ smdk build
$ smdk test
## Load to Cluster
$ smdk load
## Publish to Hub
$ smkd publish
Apply duplication for exactly-once semantics. Choose our certified pre-built algorithms or build your own.
# Deduplication configuration file
# - topic-config.yaml
name: dedup
deduplication:
bounds:
count: 1000000
age: 1w
filter:
transform:
uses: infinyon-labs/[email protected]
# apply dedup configuration
$ fluvio topic apply --config topic-config.yaml
# produce
$ fluvio produce dedup --key-separator ":"
1: one
1: one
2: two
# consume
$ fluvio consume dedup -B -k
1: one
2: two
Materialized views are in early development and the interfaces presented here are subject to change. Join us on Discord to give us feedback.
# Materialized view configuration file
# - mv.yaml
apiVersion: 0.1.0
meta:
name: avg_bus_speed
materialize:
window:
type: tumbling
bound: "5min"
trigger: events
input:
topic: helsinki
schema:
- name: vehicle_id
type: u16
- name: speed
type: float
output:
fields:
- name: vehicle_id
- name: avg_speed
compute:
uses: "infinyon-labs/[email protected]"
with:
group_by:
- vehicle_id
operation:
type: average
param: speed
result: avg_speed
# create materialized view
$ fluvio materialized-view create --config mv.yaml
# view materialized view current state
$ fluvio state avg_bus_speed
{
[
{
"vehicle_id": "123",
"avg_speed": 10.0
},
{
"vehicle_id": "456",
"avg_speed": 35.0
}
]
}
Send data to multiple destinations with build-in certified connectors, your own custom connectors, or your full featured consumer clients.
apiVersion: 0.1.0
meta:
version: 0.2.4
name: my-http-sink
type: http-sink
topic: http-sink-topic
secrets:
- name: AUTHORIZATION_TOKEN
http:
endpoint: "http://127.0.0.1/post"
headers:
- "Authorization: token ${{ secrets.AUTHORIZATION_TOKEN }}"
- "Content-Type: application/json"
apiVersion: 0.1.0
meta:
version: 0.3.1
name: json-sql-connector
type: sql-sink
topic: sql-topic
secrets:
- name: DATABASE_URL
sql:
url: ${{ secrets.DATABASE_URL }}
transforms:
- uses: infinyon/json-sql
with:
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
apiVersion: 0.1.0
meta:
version: 0.1.1
name: weather-monitor-sandiego
type: graphite-sink
topic: weather-ca-sandiego
graphite:
# https://graphite.readthedocs.io/en/latest/feeding-carbon.html#step-1-plan-a-naming-hierarchy
metric-path: "weather.temperature.ca.sandiego"
addr: "localhost:2003"
apiVersion: 0.1.0
meta:
version: 0.1.0
name: duckdb-connector
type: duckdb-sink
topic: fluvio-topic-source
duckdb:
url: 'local.db' # local duckdb
use async_std::stream::StreamExt;
use fluvio::{Fluvio, Offset};
#[async_std::main]
async fn main() {
let topic = "rust-topic";
let partition = 0;
let fluvio = Fluvio::connect().await.unwrap();
let consumer = fluvio::consumer(topic, partitino).await.unwrap();
let mut stream = consumer.stream(Offset::from_end(1)).await.unwrap();
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
#!/usr/bin/env python
from fluvio import Fluvio, Offset
if __name__ == "__main__":
topic = "python-topic";
partition = 0;
fluvio = Fluvio.connect()
consumer = fluvio.partition_consumer(topic, partition)
for idx, record in enumerate( consumer.stream(Offset.from_end(10)) ):
print("{}".format(record.value_string()))
if idx >= 9:
break
import Fluvio, { Offset, Record } from "@fluvio/client";
const consume = async () => {
const TOPIC_NAME = "hello-node";
const PARTITION = 0;
await fluvio.connect();
const consumer = await fluvio.partitionConsumer(TOPIC_NAME, PARTITION);
await consumer.stream(Offset.FromEnd(), async (record: Record) => {
console.log(`Key=${record.keyString()}, Value=${record.valueString()}`);
process.exit(0);
});
};
const fluvio = new Fluvio();
consume();
Coming soon ...
## Use connector developer kit (cdk) to build your own.
$ fluvio install cdk
## Create your connector project
$ cdk generate
🤷 Project Name: my-connector
🤷 Which type of Connector would you like [source/sink]? ›
source
❯ sink
## Build & Test
$ cdk build
$ cdk test
## Deploy on your machine
$ cdk deploy
## Publish to Hub
$ cdk publish
What data practitioners say
-
Artem
Data Platform Architect
"I would love a unified solution without the memory limitations of JVM based tools."
-
Joao
Cloud Infrastructure Architect
"It was awesome to see the CLI experience. That's like a million times better than Kafka. I think it's uncomfortable how much better it is than the Kafka."
-
Jowanza
Data Practitioner, CEO, Author
"The event based approach without babysitting a bunch of point solutions is the way I want to build."
Ditch the duct tape data stack
Each additional tool in your stack...






Find out what your data platform could become
Newsletter
Subscribe to our newsletter to keep up with the latest news and upcoming releases.
Success
Your subscription is confirmed.
Error
Please try again, or contact support.