Stateful Data Flow Beta Build composable event-driven data pipelines in minutes.

Get Started Now

Yes, It's easy to build a Fluvio connector in Rust.

Priyanshu Verma

Priyanshu Verma

Contributor, InfinyOn

SHARE ON
GitHub stars

This post was originally published on DevTo by Priyanshu Verma and shared on the InfinyOn Website. Canonical Link.

Introduction

In today’s fast-paced world, real-time data is no longer a luxury—it’s a necessity. Whether you’re monitoring live stock prices, analyzing social media trends, or syncing spreadsheets for instant insights, having data at your fingertips can make all the difference. That’s where Fluvio comes in, a platform designed to streamline your data journey by offering a powerful, yet easy-to-use, infrastructure for building and managing data pipelines.

But here’s the real magic of Fluvio: It gives you the tools to not only move data between systems but also build your very own custom connectors! Imagine you’re working with Google Sheets, and you want your data to flow seamlessly from a Fluvio topic into a spreadsheet in real-time. Manually exporting data sounds tedious, right? What if you could build an outbound connector that does all this for you automatically?

Sounds fun? It is! In this guide, we’ll walk through the process of building your own Google Sheets outbound connector. This isn’t just another tech tutorial—by the end of this, you’ll have a functional connector and a deeper understanding of how Fluvio can integrate with just about anything.

Why Build a Connector?

If you’ve ever dealt with the challenge of moving data between systems, you know how time-consuming and prone to error it can be. Fluvio’s connectors are built to make this painless. You can import data with inbound connectors or export it with outbound connectors, depending on your needs. And the beauty is that both work in a similar way—the only difference is which direction your data is flowing in relation to a Fluvio topic.

Imagine the possibilities:

  • Automatically stream real-time data from Fluvio to Google Sheets for analysis or reporting.
  • Set up an outbound connector to sync Fluvio data back into tools like Google Sheets for easy collaboration.
  • Use Fluvio as a central hub for all your real-time data needs, and effortlessly extend it with your own connectors!

Let’s Dive In

We’ll use Google Sheets as our example, but the principles you’ll learn can apply to virtually any data destination. Whether you want to push data to a cloud service, a database, or even a custom application, Fluvio connectors can handle it. And once you get the hang of it, the possibilities are endless.

In the next sections, we’ll cover how to set up a Google Sheets outbound connector, configure it for your specific needs, and show how to stream data directly from a Fluvio topic to your spreadsheet. It’s a fun way to get started with Fluvio and unleash your creativity as a data engineer.

Ready to get your hands dirty and build something awesome? Let’s jump right in and start connecting your data with the world of real-time streaming!

For best developer experience use WSL or linux. I am assuming that you are working on a linux machine.

Setup Environment to build connector

Install Rust on your machine

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

If you are facing any issue [Visit Rust Guide](https://www.rust-lang.org/tools/install).

- Add rust musl toolchain

rustup target add x86_64-unknown-linux-musl

Install Fluvio CLI

We need to install fluvio cli for testing and generating connector project.

curl -fsS https://hub.infinyon.cloud/install/install.sh | bash

As part of the initial setup, `fvm` will also install the Fluvio CLI available in the stable channel as of the moment of installation.

Fluvio is stored in `$HOME/.fluvio`, with the executable binaries stored in `$HOME/.fluvio/bin`. Visit official [fluvio guide](https://www.fluvio.io/docs/fluvio/quickstart)

Generate a connector template project with CLI

I am following fluvio’s connector docs.

To generate project run:

$ cdk generate
🤷   Project Name: my-connector
🤷   Please set a group name: acme
🤷   Which type of Connector would you like [source/sink]? · sink
🤷   Will your Connector be public? · false
[1/8]   Done: .gitignore
[2/8]   Done: Cargo.toml
[3/8]   Done: Connector.toml
[4/8]   Done: README.md
[5/8]   Done: sample-config.yaml
[6/8]   Done: src/config.rs
[7/8]   Done: src/main.rs
[8/8]   Done: src

Choose connector type wisely. I am selecting sink. Now open this project in code editor like VScode. You will find a file tree like this:

$ tree
.
├── Cargo.toml
├── Connector.toml
├── README.md
├── sample-config.yaml
└── src
    ├── config.rs
    └── main.rs

Files you need to know are:

  • Cargo.toml - It is just like package.json for rust.
  • main.rs - It is the entry point of the program.
  • config.rs - It will have the connector parameters provided in sample-config.yaml.
  • sample-config.yaml - It is a sample configuration to test our connector.

Now we will install required crates for the project. You can use cargo add [name] but for simplicity and consistency copy this dependency section to your Cargo.toml:

[dependencies]
futures = { version = "0.3", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false }
anyhow = { version = "1.0" }
async-std = { version = "1.8", default-features = false, features = [
  "attributes",
  "tokio1",
] }
async-trait = { version = "0.1", default-features = false }
fluvio = { git = "https://github.com/infinyon/fluvio", rev = "98cfc21314c93d4c2898edc9e2160f280622be21" }
fluvio-connector-common = { git = "https://github.com/infinyon/fluvio", rev = "98cfc21314c93d4c2898edc9e2160f280622be21", features = [
  "derive",
] }
humantime = "2.1.0"
google-sheets4 = "*"

Main package to highlight is google-sheets4. It will help us to connect google spreadsheet.

Let’s write some code

First thing is to setup config inside config.rs. Let’s declare some variables for google sheets like private_key, client_email and token_url.

use fluvio_connector_common::{connector, secret::SecretString};

#[derive(Debug)]
#[connector(config, name = "sheet")]
pub(crate) struct SheetConfig {
    pub google_private_key: SecretString,
    pub google_client_email: SecretString,
    pub google_token_url: SecretString,
}

Here thing to note is the #[connector(config, name = "sheet")]. In this, name = 'sheet' is the section in which you add these secrets. You will understand it in next section.

Setup sample-config.yaml

Here in this we will add secrets in this config file. Open this and add the required code:

apiVersion: 0.1.0
meta:
  version: 0.1.0
  name: my-sheet-connector-test-connector
  type: sheet-connector-sink
  topic: test-sheet-connector-topic
  secrets:
    - name: GOOGLE_PRIVATE_KEY
    - name: GOOGLE_CLIENT_EMAIL
    - name: GOOGLE_TOKEN_URI
sheet:
  google_private_key: ${{ secrets.GOOGLE_PRIVATE_KEY }}
  google_client_email: ${{ secrets.GOOGLE_CLIENT_EMAIL }}
  google_token_url: ${{ secrets.GOOGLE_TOKEN_URI }}

This file is almost self-explanatory. In this, it’s important to not copy and paste this code snippet. In type there should be your project connector name. And see sheet: we wrote this block by sheet name because we coded it in config.rs as I told you in previous section.

Create a secrets.txt file in root folder and include this following data:

GOOGLE_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----private-key----END PRIVATE KEY-----\n"
GOOGLE_CLIENT_EMAIL="example.iam.gserviceaccount.com"
GOOGLE_TOKEN_URI="https://oauth2.googleapis.com/token"

You will get these things from the Google Cloud Console.

How to get them steps are at last of the article.

Now code a sink file

Create a file sink.rs inside src

add this code inside this file:

use crate::{config::SheetConfig, Payload};
use async_trait::async_trait;
use fluvio::Offset;
use fluvio_connector_common::{LocalBoxSink, Result, Sink};
use google_sheets4::{
    api::ValueRange,
    hyper::{self, client::HttpConnector},
    hyper_rustls::{self, HttpsConnector},
    oauth2::{self, ServiceAccountKey},
    Error, Sheets,
};

pub(crate) struct SheetsSink {
    secret: oauth2::ServiceAccountKey,
}

impl SheetsSink {
    pub(crate) fn new(config: &SheetConfig) -> Result<Self> {
        let private_key = config.google_private_key.resolve()?;
        let client_email = config.google_client_email.resolve()?;
        let token_uri = config.google_token_url.resolve()?;
        let secret: oauth2::ServiceAccountKey = ServiceAccountKey {
            client_email,
            private_key,
            token_uri,
            auth_provider_x509_cert_url: None,
            auth_uri: None,
            client_id: None,
            client_x509_cert_url: None,
            key_type: None,
            private_key_id: None,
            project_id: None,
        };

        Ok(Self { secret })
    }
}

#[async_trait]
impl Sink<Payload> for SheetsSink {
    async fn connect(self, _offset: Option<Offset>) -> Result<LocalBoxSink<Payload>> {
        let auth = oauth2::ServiceAccountAuthenticator::builder(self.secret)
            .build()
            .await?;
        let hub = Sheets::new(
            hyper::Client::builder().build(
                hyper_rustls::HttpsConnectorBuilder::new()
                    .with_native_roots()
                    .unwrap()
                    .https_or_http()
                    .enable_http1()
                    .build(),
            ),
            auth,
        );

        let unfold = futures::sink::unfold(
            hub,
            |hub: Sheets<HttpsConnector<HttpConnector>>, record: Payload| async move {
                let req = ValueRange {
                    values: Some(record.values),
                    range: None,
                    major_dimension: None,
                };
                let result = hub
                    .spreadsheets()
                    .values_append(req, &record.spreadsheet_id, &record.range)
                    .value_input_option("USER_ENTERED")
                    .insert_data_option("OVERWRITE")
                    .include_values_in_response(false)
                    .doit()
                    .await;
                match result {
                    Err(e) => match e {
                        // The Error enum provides details about what exactly happened.
                        // You can also just use its `Debug`, `Display` or `Error` traits
                        Error::HttpError(_)
                        | Error::Io(_)
                        | Error::MissingAPIKey
                        | Error::MissingToken(_)
                        | Error::Cancelled
                        | Error::UploadSizeLimitExceeded(_, _)
                        | Error::Failure(_)
                        | Error::BadRequest(_)
                        | Error::FieldClash(_)
                        | Error::JsonDecodeError(_, _) => println!("{}", e),
                    },
                    Ok(res) => println!("Success: {:?}", res.0.status()),
                }
                Ok::<_, anyhow::Error>(hub)
            },
        );
        Ok(Box::pin(unfold))
    }
}

Don’t react, it is very easy. Just understand the core function.

pub(crate) struct SheetsSink {
    secret: oauth2::ServiceAccountKey,
}

Create a struct here. It is SheetsSink inside which we store data on initial connector run. We are saving ServiceAccountKey.

impl SheetsSink {
    pub(crate) fn new(config: &SheetConfig) -> Result<Self> {
        let private_key = config.google_private_key.resolve()?;
        let client_email = config.google_client_email.resolve()?;
        let token_uri = config.google_token_url.resolve()?;
        let secret: oauth2::ServiceAccountKey = ServiceAccountKey {
            client_email,
            private_key,
            token_uri,
            auth_provider_x509_cert_url: None,
            auth_uri: None,
            client_id: None,
            client_x509_cert_url: None,
            key_type: None,
            private_key_id: None,
            project_id: None,
        };

        Ok(Self { secret })
    }
}

In this code block we are getting config like private_key and others and creating a ServiceAccountKey and set it to Self.

#[async_trait]
impl Sink<Payload> for SheetsSink {

}

After that we are implementing SheetsSink for the native Sink provided by fluvio. It requires a generic type of the struct that has data which you want to save. Here we created one named Payload importing from main.rs. We will see in next section.

We will create an async function named connect inside which we created auth variable saving authentication thing for google spreadsheet and create a hub with that auth. This part is required for the package google_sheets4. You can see their docs for more information.

let unfold = futures::sink::unfold(
    hub,
    |hub: Sheets<HttpsConnector<HttpConnector>>, record: Payload| async move {
        let req = ValueRange {
            values: Some(record.values),
            range: None,
            major_dimension: None,
        };
        let result = hub
            .spreadsheets()
            .values_append(req, &record.spreadsheet_id, &record.range)
            .value_input_option("USER_ENTERED")
            .insert_data_option("OVERWRITE")
            .include_values_in_response(false)
            .doit()
            .await;
        match result {
            Err(e) => match e {
                // The Error enum provides details about what exactly happened.
                // You can also just use its `Debug`, `Display` or `Error` traits
                Error::HttpError(_)
                | Error::Io(_)
                | Error::MissingAPIKey
                | Error::MissingToken(_)
                | Error::Cancelled
                | Error::UploadSizeLimitExceeded(_, _)
                | Error::Failure(_)
                | Error::BadRequest(_)
                | Error::FieldClash(_)
                | Error::JsonDecodeError(_, _) => println!("{}", e),
            },
            Ok(res) => println!("Success: {:?}", res.0.status()),
        }
        Ok::<_, anyhow::Error>(hub)
    },
);
Ok(Box::pin(unfold))

After that we have created an unfold variable which is resolving records that are sent on topic. In unfold we are taking two parameters: hub and a payload. At this At this I think you are familiar them both.

And executing a function to append the values in google spreadsheet and handling errors if there is any. then we are cleaning everything at the end with Ok().

It’s time to touch main.rs entrypoint

In this file we need to understand only couple of things first add this code inside it.

mod config;
mod sink;
use std::time::Duration;

use anyhow::anyhow;
use config::SheetConfig;
use futures::{SinkExt, StreamExt};

use fluvio_connector_common::{
    connector,
    consumer::ConsumerStream,
    future::retry::ExponentialBackoff,
    tracing::{error, trace, warn},
    Result, Sink,
};
use serde::{Deserialize, Serialize};
use sink::SheetsSink;

const BACKOFF_MIN: Duration = Duration::from_secs(1);
const BACKOFF_MAX: Duration = Duration::from_secs(3600 * 24);

// Data payload structure to be inserted into the Google Sheet
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Payload {
    pub range: String,
    pub values: Vec<Vec<serde_json::Value>>,
    pub major_dimension: String,
    pub spreadsheet_id: String,
}

#[connector(sink)]
async fn start(config: SheetConfig, mut stream: impl ConsumerStream) -> Result<()> {
    println!("Starting sheet-connector sink connector with {config:?}");
    let mut backoff = backoff_init()?;
    loop {
        let Some(wait) = backoff.next() else {
            // not currently possible, but if backoff strategy is changed later
            // this could kick in
            let msg = "Retry backoff exhausted";
            error!(msg);
            return Err(anyhow!(msg));
        };
        if wait >= BACKOFF_MAX {
            // max retry set to 24hrs
            error!("Max retry reached");
            continue;
        }
        let sink = SheetsSink::new(&config)?;
        let mut sink = match sink.connect(None).await {
            Ok(sink) => sink,
            Err(err) => {
                warn!(
                    "Error connecting to sink: \"{}\", reconnecting in {}.",
                    err,
                    humantime::format_duration(wait)
                );
                async_std::task::sleep(wait).await;
                continue; // loop and retry
            }
        };
        // reset the backoff on successful connect
        backoff = backoff_init()?;

        while let Some(item) = stream.next().await {
            let out: std::result::Result<Payload, serde_json::Error> =
                serde_json::from_slice(item?.as_ref());
            match out {
                Ok(payload) => {
                    trace!(?payload);
                    sink.send(payload).await?;
                }
                Err(_) => error!("data parsing error. try again"),
            }
        }
    }
}

fn backoff_init() -> Result<ExponentialBackoff> {
    let bmin: u64 = BACKOFF_MIN.as_millis().try_into()?;
    Ok(ExponentialBackoff::from_millis(bmin).max_delay(BACKOFF_MAX))
}

First thing is Payload struct

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Payload {
    pub range: String,
    pub values: Vec<Vec<serde_json::Value>>,
    pub major_dimension: String,
    pub spreadsheet_id: String,
}

This struct contain important things like values, range and spreadsheet_id and everything is self explanatory.

Then we have a function start which is the entrypoint gives us config and consumer stream inside this we are handling retries with backoff function on need to understand this for now it just optimize connector.

The main code block is this

let sink = SheetsSink::new(&config)?;
        let mut sink = match sink.connect(None).await {
            Ok(sink) => sink,
            Err(err) => {
                warn!(
                    "Error connecting to sink: \"{}\", reconnecting in {}.",
                    err,
                    humantime::format_duration(wait)
                );
                async_std::task::sleep(wait).await;
                continue; // loop and retry
            }
        };
// reset the backoff on successful connect
        backoff = backoff_init()?;

In this we are connecting to our own sink that we created with config provided and handling error if there is any. After this resetting backoff after connection.

Then In last block below we are looping the stream in which we parsed Payload by serde_json and executing that with sink.send(payload). Handling some errors.

while let Some(item) = stream.next().await {
    let out: std::result::Result<Payload, serde_json::Error> =
        serde_json::from_slice(item?.as_ref());
    match out {
        Ok(payload) => {
            trace!(?payload);
            sink.send(payload).await?;
        }
        Err(_) => error!("data parsing error. try again"),
    }
}

Finally build connector and test

It’s time to build the connector and test it.

Build connector by this command make sure running in root directory:

cdk build

If facing any error then use this command:

cdk build --target x86-64-unknown-linux-unknown

change x86-64 with your system’s architecture

Test it with sample-config.yaml and secrets.txt:

cdk test --config sample-config.yaml --secrets secrets.txt

Run fluvio producer and produce some sample data and see it in google spreadsheet:

fluvio produce [topic-name]

Send sample data like:

{
  "range": "A1",
  "major_dimension": "ROWS",
  "values": [["A1 value", "A2 value", "A3 value"]],
  "spreadsheet_id": "spreadsheet_id"
}

Replace spreadsheet_id with your own.

If you face any permission error like this:

{
 "error": {
    "code": 403,
    "message": "The caller does not have permission",
    "status": "PERMISSION_DENIED"
  }
}

Refer to this fix:

Add your client_email from google console in the spreadsheet’s editor section or see Fix On StackOverflow.

Get Service account keys

  1. Go to Google Cloud Console.
  2. Create a new project or select existing project
  3. Go to Enable APIs and Services
  4. Click on Library and Search for Google Sheets API and enable it
  5. Now go to service account to create one

Source Code: Github

This was it for this article hope you enjoyed building it. My name is Priyanshu Verma and you are reading about fluvio connectors.

Connect with Us

We hope this guide has inspired you to explore the possibilities of building custom connectors with Fluvio. If you’d like to discuss how dataflows can be tailored to your specific use case, let’s connect for a personalized chat.

Stay in Touch:

We’re excited to see what you’ll build with Fluvio!