The SQL Sink connector reads records from Fluvio topic, applies configured transformations, and
sends new records to the SQL database (via INSERT statements).
Supported databases
PostgreSQL
SQLite
Data types
Model
PostgreSQL
SQLite
Bool
BOOL
BOOLEAN
Char
CHAR
INTEGER
SmallInt
SMALLINT, SMALLSERIAL, INT2
INTEGER
Int
INT, SERIAL, INT4
INTEGER
BigInt
BIGINT, BIGSERIAL, INT8
BIGINT, INT8
Float
REAL, FLOAT4
REAL
DoublePrecision
DOUBLE PRECISION, FLOAT8
REAL
Text
VARCHAR, CHAR(N), TEXT, NAME
TEXT
Bytes
BYTEA
BLOB
Numeric
NUMERIC
REAL
Timestamp
TIMESTAMP
DATETIME
Date
DATE
DATE
Time
TIME
TIME
Uuid
UUID
BLOB, TEXT
Json
JSON, JSONB
TEXT
Transformations
The SQL Sink connector expects the data in Fluvio SQL Model in JSON format.
In order to work with different data formats or data structures, transformations can be applied.
The transformation is a SmartModule pulled from the SmartModule Hub. Transformations are chained according to the order
in the config. If a SmartModule requires configuration, it is passed via with section of transforms entry.
Let’s look at the example of the connector with one transformation named infinyon/json-sql. The transformation takes
records in JSON format and creates SQL insert operation to topic_message table. The value from device.device_id
JSON field will be put to device_id column and the entire json body to record column.
The JSON record:
{"device":{"device_id":1}}
The SQL database (Postgres):
CREATE TABLE topic_message (device_id int, record json);
Every step would be same except the connector config and the behavior of the connector after deployment.
We have a operation parameter which defaults to insert but we can pass upsert here to specify we want to do an upsert operation.
Upsert additionaly takes an unique-columns argument. unique-columns specifies the list indices or column names to check for uniqueness of a record.
If a record with same value in unique-columns exists in the database, it will be updated. If no record exists with same value, the given record will
be inserted.
Connector configuration file for upsert (assuming device_id is a unique column or an index in the database):
See more about upsert in our blog.
Note: the blog doesn’t use json-sql smartmodule and has hardcoded records for demonstration. sql-connector is intended to be used with json-sql.