Skip to content

Integrate EMQX with Tiger Cloud

Connect EMQX to Tiger Cloud and stream MQTT messages into a hypertable

EMQX is a distributed MQTT broker used to move data between IoT devices, applications, and backend systems in real time.

This page shows you how to connect EMQX to your Tiger Cloud service using the EMQX data integration for PostgreSQL to stream MQTT messages into TimescaleDB.

In this integration guide, you:

  • prepare a target hypertable for MQTT messages in your Tiger Cloud service service
  • configure EMQX to send messages to your PostgreSQL database
  • verify that MQTT tag values arrive in your TimescaleDB hypertable

Prerequisites for this integration guide

To follow these steps, you'll need:

  • An EMQX broker (Open Source or Enterprise) with access to the EMQX Dashboard

Create a target hypertable in Tiger Cloud

  1. Connect to your service

    Use the Tiger Cloud Console, psql, or any other SQL editor to connect to your Tiger Cloud service.

  2. Create a target hypertable for MQTT messages

    Create a hypertable with the columns you want EMQX to populate from each MQTT message. For example:

    CREATE TABLE mqtt_messages (
    ts TIMESTAMPTZ NOT NULL,
    tag_id TEXT NOT NULL,
    client_id TEXT,
    qos SMALLINT,
    value DOUBLE PRECISION
    ) WITH (
    tsdb.hypertable = true,
    tsdb.partition_column = 'ts',
    tsdb.chunk_interval = '7 days'
    );

    Choose a chunk interval that matches your message volume and query patterns. Aim for chunk sizes that fit the most recent working set in memory — a good starting point is an interval that produces chunks roughly 25% of your service RAM. For lower-volume streams, that may mean hours instead of days; for very busy MQTT workloads, a longer interval such as 2–7 days may be more appropriate.

    You can change the chunk interval later with set_chunk_time_interval(); it only affects new chunks, so the current data remains unchanged.

    Note

    You may need to change the table columns based on the format of the MQTT payload and what you want to store. The instructions here assume a plaintext numeric payload. If you need JSON or Sparkplug B mappings, see the additional examples at the end of this guide.

Configure an EMQX data integration for PostgreSQL

  1. Sign in to the EMQX Dashboard

    Navigate your browser to the EMQX Dashboard (default http://YOUR_EMQX_HOST:18083) and sign in with an admin account.

  2. Create a PostgreSQL connector

    Open Integration / Connector and select Create. Choose PostgreSQL as the connector type.

    Give the connector a name and add the host, port, database, username, and password from your Tiger Cloud service connection details. Enable TLS, then select Test Connectivity to verify the connection.

    Warning

    Tiger Cloud requires TLS. Make sure Enable TLS is on, otherwise the connector will not be able to reach your service.

  3. Create a PostgreSQL sink

    Open Integration / Rules and select Create to start a new rule. In the SQL Editor, write a rule that selects the messages you want to persist.

    Use the payload mapping example above that matches your MQTT message format. For a simple numeric payload, the rule looks like:

    SELECT
    timestamp as ts,
    topic as tag_id,
    clientid as client_id,
    qos as qos,
    CAST(payload AS DOUBLE PRECISION) as value
    FROM "#"

    Under Action Outputs, select Add Action, choose PostgreSQL, and select the connector you created in the previous step. Set the SQL template to insert into your table:

    INSERT INTO mqtt_messages (ts, tag_id, client_id, qos, value)
    VALUES (
    to_timestamp(${ts}::double precision / 1000),
    ${topic},
    ${client_id},
    ${qos},
    ${value}
    )

    Save the action, then save the rule.

  4. Verify messages are flowing

    Publish a test message to a topic that matches your rule, then query the table to confirm rows are arriving:

    SELECT * FROM mqtt_messages ORDER BY ts DESC LIMIT 10;

    If no rows appear, check the rule's Statistics tab in the EMQX Dashboard for matched and failed counts, and review the EMQX log for connector errors.

To confirm EMQX is sending MQTT data into your Tiger Cloud service:

  1. Publish a test MQTT message

    Send a test message to the topic you configured in your EMQX rule. Then check the rule's statistics in the EMQX Dashboard to verify the message was matched and the PostgreSQL action succeeded.

  2. Query the target table in Tiger Cloud service

    Use the Tiger Cloud Console, psql, or another SQL client to run:

    SELECT * FROM mqtt_messages ORDER BY ts DESC LIMIT 10;

    You should see the tag value and metadata appear in the table rows.

You have successfully integrated EMQX with Tiger Cloud.

The following table schemas, EMQX Rules, and Action Outputs can be adapted for different MQTT Payloads depending on the structure of the payload and what values are to be recorded.

Use this when the MQTT message body is JSON and you want to extract individual fields.

Sample payload:

{
"tag_value": 112.34,
"status": "ok",
"device": "sensor-1"
}

Table Schema:

CREATE TABLE mqtt_messages_json (
ts TIMESTAMPTZ NOT NULL,
tag_id TEXT NOT NULL,
client_id TEXT,
qos SMALLINT,
value DOUBLE PRECISION,
status TEXT
) WITH (
tsdb.hypertable = true,
tsdb.partition_column = 'ts',
tsdb.chunk_interval = '7 days'
);

Rule:

SELECT
timestamp as ts,
topic as tag_id,
clientid as client_id,
qos as qos,
CAST(payload->>'tag_value' AS DOUBLE PRECISION) as value,
payload->>'status' as status
FROM "#"

Action output:

INSERT INTO mqtt_messages_json (ts, tag_id, client_id, qos, value, status)
VALUES (
to_timestamp(${ts}::double precision / 1000),
${topic},
${client_id},
${qos},
${value},
${status}
)

Use this when EMQX receives Sparkplug B messages with nested metric data.

Sample payload:

{
"namespace": "spBv1.0",
"group_id": "factory-1",
"edge_node_id": "edge-01",
"device_id": "device-01",
"metrics": [
{ "name": "temperature", "value": 72.5 }
],
"timestamp": 1710000000000
}

Table Schema:

CREATE TABLE mqtt_messages_sparkplug (
ts TIMESTAMPTZ NOT NULL,
namespace TEXT NOT NULL,
group_id TEXT NOT NULL,
edge_node_id TEXT,
device_id TEXT,
metric_name TEXT,
metric_value DOUBLE PRECISION,
metric_ts TIMESTAMPTZ
) WITH (
tsdb.hypertable = true,
tsdb.partition_column = 'ts',
tsdb.chunk_interval = '7 days'
);

Rule:

SELECT
timestamp as ts,
topic as tag_id,
clientid as client_id,
qos as qos,
payload->>'namespace' as namespace,
payload->>'group_id' as group_id,
payload->>'edge_node_id' as edge_node_id,
payload->>'device_id' as device_id,
(payload->'metrics'->0)->>'name' as metric_name,
CAST((payload->'metrics'->0)->>'value' AS DOUBLE PRECISION) as metric_value,
to_timestamp(CAST(payload->>'timestamp' AS DOUBLE PRECISION) / 1000) as metric_ts
FROM "#"

Action output:

INSERT INTO mqtt_messages_sparkplug (ts, namespace, group_id, edge_node_id, device_id, metric_name, metric_value, metric_ts)
VALUES (
to_timestamp(${ts}::double precision / 1000),
${namespace},
${group_id},
${edge_node_id},
${device_id},
${metric_name},
${metric_value},
${metric_ts}
)
  • If no rows appear in the target table, verify EMQX rule statistics for matched and failed counts, and confirm the connector is using the correct PostgreSQL credentials and database host.
  • If the rule matches messages but the action fails, check that the payload fields are being extracted with the right JSON path and that the action SQL template uses the same column names as your target table.