This blog shows the power of Fluvio for performing real-time data transformations and provides a step by step example of how to stream clean data to a Kafka topic. In this example we are taking source data from the Finnhub API with our HTTP source connector, aggregating stock prices, and caluclating unrealised gains or losses in real-time before we send it to Apache Kafka.
Install minikube, helm, kubectr with the following instructions: https://www.fluvio.io/docs/get-started/linux/#installing-kubernetes-cluster.
$ curl -fsS https://packages.fluvio.io/v1/install.sh | bash
Start Fluvio Cluster
Verify the cluster is running:
$ fluvio topic create greetings
echo "Hello, Fluvio" | fluvio produce greetings
fluvio consume greetings -B -d

Recap of the Financal Services Demo
• git clone https://github.com/infinyon/fluvio-demo-04-12-2022-finance-demo.git
• register on finhub.io and obtain api token
• update API token in quote-data-input.yaml
Create a HTTP connector Check if the fluvio topic is populated:
$ fluvio consume gme-stocks -B
Start a local Apache Kafka dev
Clone https://github.com/infinyon/kafka_webinar_16_August_2022 and change the value ADV_HOST in docker-compose-webinar.yml, where ADV_HOST is pinned to minikube network gateway 192.168.49.1:
check minikube ip
$ minikube ip
192.168.49.2
and amend ADV_HOST in docker-compose-webinar.yml
$ docker compose -f docker-compose-webinar.yml up -d
Validate that Kafka is working
$ docker run --rm -it --net=host lensesio/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
Write to Kafka from Fluvio topic
ADV_HOST
and kafka_url
in webinar-kafka-sink-connector.yml
shall match to local IP (ifconfig| grep inet for linux)
fluvio cloud connector create -c ./webinar-kafka-sink-connector.yml
fluvio cloud connector logs -f my-kafka-sink1

Apply a Smart Module to a fluvio topic before writing to Kafka
Smart module calculates unrealised gains or losses. Runs an aggregate function on an assumed “purchased” stocks (warrant).
fn update_profit(&mut self) {
let mut profit = 0.0;
for warrant in &self.warrants {
profit += (self.current_price - (warrant.exercise_price + warrant.purchase_price))*warrant.count as f64;
}
where warrents.txt
{"expiry_date": "Tue, 11 Apr 2022 13:50:37 +0000", "exercise_price": 140.0, "purchase_price": 12.0, "count": 10}
{"expiry_date": "Tue, 12 Apr 2022 13:50:37 +0000", "exercise_price": 110.0, "purchase_price": 10.0, "count": 11}
{"expiry_date": "Tue, 12 Apr 2022 17:50:37 +0000", "exercise_price": 150.0, "purchase_price": 11.0, "count": 12}
{"expiry_date": "Tue, 13 Apr 2022 13:50:37 +0000", "exercise_price": 160.0, "purchase_price": 13.0, "count": 13}
In the fluvio-demo-04-12-2022-finance-demo folder run
make sm-upload
make produce-warrants
make sm-consume
Those commands will compile and upload a smart module. Produce warrants will generate purchase orders so current profit can be calculated.
Start Kafka sink connector with SmartModule
fluvio cloud connector create -c ./webinar-kafka-sink-connector-with-sm.yml
Rerun produce warrants: In fluvio-demo-04-12-2022-finance-demo run
%copy%
Sink connector reads fluvio topic from the end, and we are re-running make produce-warrants
to make sure fluvio topic is populated, which is then appearing in kafka-aggregate-fluvio.
Watch kafka topic via Web UI http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/kafka-aggregate-fluvio/
or via command line:
docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.1.89:9092"
Webinar recording: Enhance your kafka infrastructure with fluvio
See webinar with live demo.
Stay connected
Please, be sure to join our Discord server if you want to talk to us or have any questions.
Subscribe to our YouTube channel
Follow us on Twitter
Follow us on LinkedIn
Try InfinyOn Cloud a fully managed Fluvio service
Have a happy coding, and stay tuned!
Running Kafka commands:
docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.49.1:9092"