MQTT 协议
MQTT 协议
1. 概述
MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。
IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OASIS 国际标准协议)。IoTDB 服务器内置高性能 MQTT Broker 服务模块,无需第三方中间件,支持设备通过 MQTT 报文将时序数据直接写入 IoTDB 存储引擎。

注意,自 V2.0.8.2 版本起,TimechoDB 安装包中默认不包含 MQTT 服务的 JAR 包。请使用该服务前联系天谋团队获取 JAR 包,并放置于 timechodb_home/lib 或者 timechodb_home/ext/external_service 路径下。
2. 内置 MQTT 服务
内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。
MQTT 主题与 IoTDB 时间序列相对应。
消息有效载荷可以由 Java SPI 加载的PayloadFormatter格式化为事件,默认实现为JSONPayloadFormatter
默认的json格式化程序支持两种 json 格式以及由他们组成的json数组,以下是 MQTT 消息有效负载示例:
{
"device":"root.sg.d1",
"timestamp":1586076045524,
"measurements":["s1","s2"],
"values":[0.530635,0.530635]
}或者
{
"device":"root.sg.d1",
"timestamps":[1586076045524,1586076065526],
"measurements":["s1","s2"],
"values":[[0.530635,0.530635], [0.530655,0.530695]]
}或者以上两者的JSON数组形式。

3. MQTT 配置
默认情况下,IoTDB MQTT 服务从${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties加载配置。
配置如下:
| 名称 | 描述 | 默认 |
|---|---|---|
enable_mqtt_service | 是否启用 mqtt 服务 | FALSE |
mqtt_host | mqtt 服务绑定主机 | 127.0.0.1 |
mqtt_port | mqtt 服务绑定端口 | 1883 |
mqtt_handler_pool_size | 处理 mqtt 消息的处理程序池大小 | 1 |
mqtt_payload_formatter | mqtt 消息有效负载格式化程序。可选项:json:仅适用于树模型。line:仅适用于表模型。 | json |
mqtt_max_message_size | mqtt 消息最大长度(字节) | 1048576 |
4. 示例代码
以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Random random = new Random();
for (int i = 0; i < 10; i++) {
String payload = String.format("{\n" +
"\"device\":\"root.sg.d1\",\n" +
"\"timestamp\":%d,\n" +
"\"measurements\":[\"s1\"],\n" +
"\"values\":[%f]\n" +
"}", System.currentTimeMillis(), random.nextDouble());
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();5. 自定义 MQTT 消息格式
在生产环境中,每个设备通常都配备了自己的 MQTT 客户端,且这些客户端的消息格式已经预先设定。如果按照 IoTDB 所支持的 MQTT 消息格式进行通信,就需要对现有的所有客户端进行全面的升级改造,这无疑会带来较高的成本。然而,我们可以通过简单的编程手段,轻松实现 MQTT 消息格式的自定义,而无需改造客户端。
可以在源码的 example/mqtt-customize 项目中找到一个简单示例。
假定mqtt客户端传过来的是以下消息格式:
{
"time":1586076045523,
"deviceID":"car_1",
"deviceType":"油车",
"point":"油量",
"value":10.0
}或者JSON的数组形式:
[
{
"time":1586076045523,
"deviceID":"car_1",
"deviceType":"油车",
"point":"油量",
"value":10.0
},
{
"time":1586076045524,
"deviceID":"car_2",
"deviceType":"新能源车",
"point":"速度",
"value":80.0
}
]则可以通过以下步骤设置设置自定义MQTT消息格式:
- 创建一个 Java 项目,增加如下依赖
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>2.0.4-SNAPSHOT</version>
</dependency>- 创建一个实现类,实现接口
org.apache.iotdb.db.mqtt.protocol.PayloadFormatter
package org.apache.iotdb.mqtt.server;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import org.apache.iotdb.db.protocol.mqtt.TableMessage;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.tsfile.enums.TSDataType;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* The Customized JSON payload formatter. one json format supported: { "time":1586076045523,
* "deviceID":"car_1", "deviceType":"新能源车", "point":"速度", "value":80.0 }
*/
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
private static final String JSON_KEY_TIME = "time";
private static final String JSON_KEY_DEVICEID = "deviceID";
private static final String JSON_KEY_DEVICETYPE = "deviceType";
private static final String JSON_KEY_POINT = "point";
private static final String JSON_KEY_VALUE = "value";
private static final Gson GSON = new GsonBuilder().create();
@Override
public List<Message> format(String topic, ByteBuf payload) {
if (payload == null) {
return new ArrayList<>();
}
String txt = payload.toString(StandardCharsets.UTF_8);
JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
if (jsonElement.isJsonObject()) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
return formatTableRow(topic, jsonObject);
} else if (jsonElement.isJsonArray()) {
JsonArray jsonArray = jsonElement.getAsJsonArray();
List<Message> messages = new ArrayList<>();
for (JsonElement element : jsonArray) {
JsonObject jsonObject = element.getAsJsonObject();
messages.addAll(formatTableRow(topic, jsonObject));
}
return messages;
}
throw new JsonParseException("payload is invalidate");
}
@Override
@Deprecated
public List<Message> format(ByteBuf payload) {
throw new NotImplementedException();
}
private List<Message> formatTableRow(String topic, JsonObject jsonObject) {
TableMessage message = new TableMessage();
String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/"));
String table = "test_table";
// Parsing Database Name
message.setDatabase((database));
// Parsing Table Name
message.setTable(table);
// Parsing Tags
List<String> tagKeys = new ArrayList<>();
tagKeys.add(JSON_KEY_DEVICEID);
List<Object> tagValues = new ArrayList<>();
tagValues.add(jsonObject.get(JSON_KEY_DEVICEID).getAsString());
message.setTagKeys(tagKeys);
message.setTagValues(tagValues);
// Parsing Attributes
List<String> attributeKeys = new ArrayList<>();
List<Object> attributeValues = new ArrayList<>();
attributeKeys.add(JSON_KEY_DEVICETYPE);
attributeValues.add(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString());
message.setAttributeKeys(attributeKeys);
message.setAttributeValues(attributeValues);
// Parsing Fields
List<String> fields = Arrays.asList(JSON_KEY_POINT);
List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT);
List<Object> values = Arrays.asList(jsonObject.get(JSON_KEY_VALUE).getAsFloat());
message.setFields(fields);
message.setDataTypes(dataTypes);
message.setValues(values);
// Parsing timestamp
message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong());
return Lists.newArrayList(message);
}
@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-common.properties as the following string:
return "CustomizedJson2Table";
}
@Override
public String getType() {
return PayloadFormatter.TABLE_TYPE;
}
}- 修改项目中的
src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter文件:
将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。
在本例中,文件内容为:org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter - 编译项目生成一个 jar 包:
mvn package -DskipTests
在 IoTDB 服务端:
- 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。
- 打开 MQTT 服务参数. (
enable_mqtt_service=trueinconf/iotdb-system.properties) - 用刚才的实现类中的 getName() 方法的返回值 设置为
conf/iotdb-system.properties中mqtt_payload_formatter的值,
, 在本例中,为CustomizedJson2Table - 启动 IoTDB
- 搞定
More: MQTT 协议的消息不限于 json,你还可以用任意二进制。通过如下函数获得:payload.forEachByte() or payload.array。
6. 注意事项
为避免因缺省client_id引发的兼容性问题,强烈建议在所有MQTT客户端中始终显式地提供唯一且非空的 client_id。
不同客户端在client_id缺失或为空时的表现并不一致,常见示例如下:
- 显式传入空字符串
• MQTTX:client_id=""时,IoTDB会直接丢弃消息;
• mosquitto_pub:client_id=""时,IoTDB能正常接收消息。 - 完全不传client_id
• MQTTX:消息可被IoTDB正常接收;
• mosquitto_pub:IoTDB拒绝连接。
由此可见,显式指定唯一且非空的client_id是消除上述差异、确保消息可靠投递的最简单做法。