Use Cases
Docs
Blog Articles
BlogResources
Pricing
PricingHow to Write to Apache Kafka from a Fluvio topic
VP Marketing, InfinyOn
This blog shows the power of Fluvio for performing real-time data transformations and provides a step by step example of how to stream clean data to a Kafka topic. In this example we are taking source data from the Finnhub API with our HTTP source connector, aggregating stock prices, and caluclating unrealised gains or losses in real-time before we send it to Apache Kafka.
Start
Install minikube, helm, kubectr with the following instructions: https://www.fluvio.io/docs/get-started/linux/#installing-kubernetes-cluster.
Install Fluvio.
Install Fluvio CLI:
$ curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
This command will download the Fluvio Version Manager (fvm), Fluvio CLI (fluvio) and config files into $HOME/.fluvio
, with the executables in $HOME/.fluvio/bin
. To complete the installation, you will need to add the executables to your shell $PATH
.
Start Fluvio Cluster:
$ fluvio cluster start
Verify the cluster is running:
$ fluvio topic create greetings
echo "Hello, Fluvio" | fluvio produce greetings
fluvio consume greetings -B -d
Part one
Recap of the Financal Services Demo
• git clone https://github.com/infinyon/fluvio-demo-04-12-2022-finance-demo.git
• register on finhub.io and obtain api token
• update API token in quote-data-input.yaml
Create a HTTP connector Check if the fluvio topic is populated:
$ fluvio consume gme-stocks -B
Start a local Apache Kafka dev
Clone https://github.com/infinyon/kafka_webinar_16_August_2022 and change the value ADV_HOST in docker-compose-webinar.yml, where ADV_HOST is pinned to minikube network gateway 192.168.49.1:
check minikube ip
$ minikube ip
192.168.49.2
and amend ADV_HOST in docker-compose-webinar.yml
$ docker compose -f docker-compose-webinar.yml up -d
Validate that Kafka is working
$ docker run --rm -it --net=host lensesio/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
Write to Kafka from Fluvio topic
ADV_HOST
and kafka_url
in webinar-kafka-sink-connector.yml
shall match to local IP (ifconfig| grep inet for linux)
fluvio connector create -c ./webinar-kafka-sink-connector.yml
fluvio connector logs -f my-kafka-sink1
Part two
Apply a Smart Module to a fluvio topic before writing to Kafka Smart module calculates unrealised gains or losses. Runs an aggregate function on an assumed “purchased” stocks (warrant).
fn update_profit(&mut self) {
let mut profit = 0.0;
for warrant in &self.warrants {
profit += (self.current_price - (warrant.exercise_price + warrant.purchase_price))*warrant.count as f64;
}
where warrents.txt
{"expiry_date": "Tue, 11 Apr 2022 13:50:37 +0000", "exercise_price": 140.0, "purchase_price": 12.0, "count": 10}
{"expiry_date": "Tue, 12 Apr 2022 13:50:37 +0000", "exercise_price": 110.0, "purchase_price": 10.0, "count": 11}
{"expiry_date": "Tue, 12 Apr 2022 17:50:37 +0000", "exercise_price": 150.0, "purchase_price": 11.0, "count": 12}
{"expiry_date": "Tue, 13 Apr 2022 13:50:37 +0000", "exercise_price": 160.0, "purchase_price": 13.0, "count": 13}
In the fluvio-demo-04-12-2022-finance-demo folder run
make sm-upload
make produce-warrants
make sm-consume
Those commands will compile and upload a smart module. Produce warrants will generate purchase orders so current profit can be calculated.
Start Kafka sink connector with SmartModule
fluvio connector create -c ./webinar-kafka-sink-connector-with-sm.yml
Rerun produce warrants: In fluvio-demo-04-12-2022-finance-demo run
%copy%
Sink connector reads fluvio topic from the end, and we are re-running make produce-warrants
to make sure fluvio topic is populated, which is then appearing in kafka-aggregate-fluvio.
Watch kafka topic via Web UI http://localhost:3030/kafka-topics-ui/#/cluster/fast-data-dev/topic/n/kafka-aggregate-fluvio/
or via command line:
docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.1.89:9092"
Webinar recording: Enhance your kafka infrastructure with fluvio
Stay connected
Connect with us:
Please, be sure to join our Discord server if you want to talk to us or have any questions.
Subscribe to our YouTube channel
Have a happy coding, and stay tuned!
Additional Notes
Running Kafka commands:
docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
docker run --rm -it --net=host landoop/fast-data-dev kafka-console-consumer --topic kafka-aggregate-fluvio --bootstrap-server "192.168.49.1:9092"