OPC UA 协议
OPC UA 协议
1. OPC UA 订阅数据
本功能支持用户以 OPC UA 协议从 IoTDB 中订阅数据,订阅数据的通信模式支持 Client/Server 和 Pub/Sub 两种。
注意:本功能并非从外部 OPC Server 中采集数据写入 IoTDB

2. OPC 服务启动方式
2.1 语法
启动 OPC UA 协议的语法:
create pipe p1
with source (...)
with processor (...)
with sink ('sink' = 'opc-ua-sink',
'sink.opcua.tcp.port' = '12686',
'sink.opcua.https.port' = '8443',
'sink.user' = 'root',
'sink.password' = 'TimechoDB@2021', //V2.0.6.x 之前默认密码为root
'sink.opcua.security.dir' = '...'
)2.2 参数
| 参数 | 描述 | 取值范围 | 是否必填 | 默认值 |
|---|---|---|---|---|
| sink | OPC UA SINK | String: opc-ua-sink | 必填 | |
| sink.opcua.model | OPC UA 使用的模式 | String: client-server / pub-sub | 选填 | client-server |
| sink.opcua.tcp.port | OPC UA 的 TCP 端口 | Integer: [0, 65536] | 选填 | 12686 |
| sink.opcua.https.port | OPC UA 的 HTTPS 端口 | Integer: [0, 65536] | 选填 | 8443 |
| sink.opcua.security.dir | OPC UA 的密钥及证书目录 | String: Path,支持绝对及相对目录 | 选填 | 1. iotdb 相关 DataNode 的 conf 目录下的 opc_security 文件夹 / <httpsPort:tcpPort>。2. 如无 iotdb 的 conf 目录(例如 IDEA 中启动 DataNode),则为用户主目录下的 iotdb_opc_security 文件夹 /<httpsPort:tcpPort> |
| sink.opcua.enable-anonymous-access | OPC UA 是否允许匿名访问 | Boolean | 选填 | true |
| sink.user | 用户,这里指 OPC UA 的允许用户 | String | 选填 | root |
| sink.password | 密码,这里指 OPC UA 的允许密码 | String | 选填 | TimechoDB@2021 (V2.0.6.x 之前默认密码为root ) |
2.3 示例
create pipe p1
with sink ('sink' = 'opc-ua-sink',
'sink.user' = 'root',
'sink.password' = 'TimechoDB@2021');//V2.0.6.x 之前默认密码为root
start pipe p1;2.4 使用限制
- 启动协议之后需要写入数据,才能建立连接,且仅能订阅建立连接之后的数据。
- 推荐在单机模式下使用。在分布式模式下,每一个 IoTDB DataNode 都作为一个独立的 OPC Server 提供数据,需要单独订阅。
3. 两种通信模式示例
3.1 Client / Server 模式
在这种模式下,IoTDB 的流处理引擎通过 OPC UA Sink 与 OPC UA 服务器(Server)建立连接。OPC UA 服务器在其地址空间(Address Space) 中维护数据,IoTDB可以请求并获取这些数据。同时,其他OPC UA客户端(Client)也能访问服务器上的数据。
- 特性:
- OPC UA 将从 Sink 收到的设备信息,按照树形模型整理到 Objects folder 下的文件夹中。
- 每个测点都被记录为一个变量节点,并记录当前数据库中的最新值。
- OPC UA 无法删除数据或者改变数据类型的设置
3.1.1 准备工作
- 此处以UAExpert客户端为例,下载 UAExpert 客户端:https://www.unified-automation.com/downloads/opc-ua-clients.html
- 安装 UAExpert,填写自身的证书等信息。
3.1.2 快速开始
- 使用如下 sql,启动 OPC UA 服务。详细语法参见上文:IoTDB OPC Server语法
create pipe p1 with sink ('sink'='opc-ua-sink');- 写入部分数据。
insert into root.test.db(time, s2) values(now(), 2)- 在 UAExpert 中配置 iotdb 的连接,其中 password 填写为上述参数配置中 sink.password 中设定的密码(此处用户名、密码以2.3小节示例中配置的 root/root 为例):


- 信任服务器的证书后,在左侧 Objects folder 即可看到写入的数据。


- 可以将左侧节点拖动到中间,并展示该节点的最新值:

3.2 Pub / Sub 模式
在这种模式下,IoTDB的流处理引擎通过 OPC UA Sink 向OPC UA 服务器(Server)发送数据变更事件。这些事件被发布到服务器的消息队列中,并通过事件节点 (Event Node) 进行管理。其他OPC UA客户端(Client)可以订阅这些事件节点,以便在数据变更时接收通知。
特性:
每个测点会被 OPC UA 包装成一个事件节点(EventNode)。
相关字段及其对应含义如下:
字段 含义 类型(Milo) 示例 Time 时间戳 DateTime 1698907326198 SourceName 测点对应完整路径 String root.test.opc.sensor0 SourceNode 测点数据类型 NodeId Int32 Message 数据 LocalizedText 3.0
- Event 仅会发送给所有已经监听的客户端,客户端未连接则会忽略该 Event。
- 如果数据被删除,信息则无法推送给客户端。
3.2.1 准备工作
该代码位于 iotdb-example 包下的 opc-ua-sink 文件夹中
代码中包含:
- 主类(ClientTest)
- Client 证书相关的逻辑(IoTDBKeyStoreLoaderClient)
- Client 的配置及启动逻辑(ClientExampleRunner)
- ClientTest 的父类(ClientExample)
3.2.2 快速开始
使用步骤为:
- 打开 IoTDB 并写入部分数据。
insert into root.a.b(time, c, d) values(now(), 1, 2); 此处自动创建元数据开启。
- 使用如下 sql,创建并启动 Pub-Sub 模式的 OPC UA Sink。详细语法参见上文:IoTDB OPC Server语法
create pipe p1 with sink ('sink'='opc-ua-sink', 'sink.opcua.model'='pub-sub');
start pipe p1; 此时能看到服务器的 conf 目录下创建了 opc 证书相关的目录。

- 直接运行 Client 连接,此时 Client 证书被服务器拒收。

- 进入服务器的 sink.opcua.security.dir 目录下,进入 pki 的 rejected 目录,此时 Client 的证书应该已经在该目录下生成。

- 将客户端的证书移入(不是复制) 同目录下 trusted 目录的 certs 文件夹中。

- 再次打开 Client 连接,此时服务器的证书应该被 Client 拒收。

- 进入客户端的 <java.io.tmpdir>/client/security 目录下,进入 pki 的 rejected 目录,将服务器的证书移入(不是复制)trusted 目录。

打开 Client,此时建立双向信任成功, Client 能够连接到服务器。
向服务器中写入数据,此时 Client 中能够打印出收到的数据。

3.4 注意事项
单机与集群:建议使用1C1D单机版,如果集群中有多个 DataNode,可能数据会分散发送在各个 DataNode 上,无法收听到全量数据。
无需操作根目录下证书:在证书操作过程中,无需操作 IoTDB security 根目录下的
iotdb-server.pfx证书和 client security 目录下的example-client.pfx目录。Client 和 Server 双向连接时,会将根目录下的证书发给对方,对方如果第一次看见此证书,就会放入 reject dir,如果该证书在 trusted/certs 里面,则能够信任对方。建议使用 Java 17+:在 JVM 8 的版本中,可能会存在密钥长度限制,报 Illegal key size 错误。对于特定版本(如 jdk.1.8u151+),可以在 ClientExampleRunner 的 create client 里加入
Security.setProperty("crypto.policy", "unlimited");解决,也可以下载无限制的包local_policy.jar与US_export_policy解决替换JDK/jre/lib/security目录下的包解决,下载网址:https://www.oracle.com/java/technologies/javase-jce8-downloads.html。