Stateful Data Flow Beta Build composable event-driven data pipelines in minutes.

Get Started Now

MQTT Connector

Data Direction Inbound Outbound
 

Inbound Connector

Reads record from MQTT topic and writes to Fluvio topic.

Supports MQTT V3.1.1 and V5 protocols.

Guide for MQTT to SQL dataflow.

 

Configuration

Option default type description
timeout 60s Duration mqtt broker connect timeout in seconds and nanoseconds
url - SecretString MQTT url which includes schema, domain, port and credentials such as username and password.
topic - String mqtt topic to subscribe and source events from
client_id UUID V4 String mqtt client ID. Using same client id in different connectors may close connection
payload_output_type binary String controls how the output of payload field is produced

url option with type SecretString can be set as raw string value:

url: "mqtt://test.mosquitto.org/"

or, as a reference to a secret with the given name:

url:
  secret:
    name: "URL_SECRET_NAME"
 

Record Type Output

JSON Serialized string with fields mqtt_topic and payload

 

Payload Output Type

Value Output
binary Array of bytes
json UTF-8 JSON Serialized String
 

Usage Example

This is an example of connector config file:

# config-example.yaml
apiVersion: 0.1.0
meta:
  version: 0.2.5
  name: my-mqtt-connector
  type: mqtt-source
  topic: mqtt-topic
  create-topic: true
mqtt:
  url: "mqtt://test.mosquitto.org/"
  topic: "mqtt-to-fluvio"
  timeout:
    secs: 30
    nanos: 0
  payload_output_type: json

Run connector in Cloud with fluvio cloud CLI

fluvio cloud connector create --config config-example.yaml

fluvio cloud connector list # to see the status
fluvio cloud connector logs my-mqtt-connector # to see connector's logs

Install MQTT Client such as

# for mac , this takes while....
brew install mosquitto

Insert records:

mosquitto_pub -h test.mosquitto.org -t mqtt-to-fluvio -m '{"device": {"device_id":1, "name":"device1"}}'

The produced record in Fluvio topic will be:

{
  "mqtt_topic": "mqtt-to-fluvio",
  "payload": {
    "device": {
      "device_id": 1,
      "name": "device1"
    }
  }
}
 

Transformations

Fluvio MQTT Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.

The previous example can be extended to add extra transformations to outgoing records:

# config-example.yaml
apiVersion: 0.1.0
meta:
  version: 0.2.5
  name: my-mqtt-connector
  type: mqtt-source
  topic: mqtt-topic
  create-topic: true
mqtt:
  url: "mqtt://test.mosquitto.org/"
  topic: "mqtt-to-fluvio"
  timeout:
    secs: 30
    nanos: 0
  payload_output_type: json
transforms:
  - uses: infinyon/[email protected]
    with:
      spec:
        - operation: shift
          spec: 
            payload:
              device: "device"
        - operation: default
          spec:
            source: "mqtt-connector"   

The object device in the resulting record will be “unwrapped” and the addition field source with value mqtt-connector will be added.

Read more about JSON to JSON transformations.