数据同步
大约 3 分钟
数据同步
本文档主要为数据同步功能的SQL语句,详细功能介绍及使用说明见 数据同步
1. 创建任务
语法:
CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字
-- 数据抽取插件,可选插件
WITH SOURCE (
[<parameter> = <value>,],
)
-- 数据处理插件,可选插件
WITH PROCESSOR (
[<parameter> = <value>,],
)
-- 数据连接插件,必填插件
WITH SINK (
[<parameter> = <value>,],
)示例一:全量数据同步
create pipe A2B
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)示例二:部分数据同步
create pipe A2B
WITH SOURCE (
'source'= 'iotdb-source',
'mode.streaming' = 'true'
'database-name'='db_b.*',
'start-time' = '2023.08.23T08:00:00+00:00',
'end-time' = '2023.10.23T08:00:00+00:00'
)
with SINK (
'sink'='iotdb-thrift-async-sink',
'node-urls' = '127.0.0.1:6668',
)示例三:双向数据传输
- 在 A IoTDB 上执行下列语句
create pipe AB
with source (
'source.mode.double-living' ='true'
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)- 在 B IoTDB 上执行下列语句
create pipe BA
with source (
'source.mode.double-living' ='true'
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667',
)示例四:边云数据传输
- 在 B IoTDB 上执行下列语句,将 B 中数据同步至 A
create pipe BA
with source (
'database-name'='db_b.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667',
)- 在 C IoTDB 上执行下列语句,将 C 中数据同步至 A
create pipe CA
with source (
'database-name'='db_c.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)- 在 D IoTDB 上执行下列语句,将 D 中数据同步至 A
create pipe DA
with source (
'database-name'='db_d.*',
'table-name'='.*',
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669',
)示例五:级联数据传输
- 在 A IoTDB 上执行下列语句,将 A 中数据同步至 B
create pipe AB
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668',
)- 在 B IoTDB 上执行下列语句,将 B 中数据同步至 C
create pipe BC
with source (
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669',
)示例六:跨网闸数据传输
create pipe A2B
with sink (
'sink'='iotdb-air-gap-sink',
'node-urls' = '10.53.53.53:9780',
)示例七:压缩同步
create pipe A2B
with sink (
'node-urls' = '127.0.0.1:6668',
'compressor' = 'snappy,lz4',
'rate-limit-bytes-per-second'='1048576'
)示例八:加密同步
create pipe A2B
with sink (
'sink'='iotdb-thrift-ssl-sink',
'node-urls'='127.0.0.1:6667',
'ssl.trust-store-path'='pki/trusted',
'ssl.trust-store-pwd'='root'
)示例九:本地导出 Object 类型数据
CREATE PIPE tsfile_export_local
WITH SOURCE (
'source' = 'iotdb-source',
'table-name' = 'test_table'
)
WITH PROCESSOR (
'processor' = 'do-nothing-processor'
)
WITH SINK (
'sink' = 'tsfile-local-sink',
'sink.local.target-path' = '/data/backup/export_2024'
'sink.rate-limit-bytes-per-second' = '10485760'
);示例十:远程传输 Object 类型数据
- 该方式需提前注册
tsfile_remote_sink插件
CREATE PIPE tsfile_export_scp
WITH SOURCE (
'source' = 'iotdb-source',
'table-name' = 'test_table'
)
WITH PROCESSOR (
'processor' = 'do-nothing-processor'
)
WITH SINK (
'sink' = 'tsfile_remote_sink',
'sink.file-mode' = 'scp',
'sink.scp.host' = '192.168.1.100',
'sink.scp.port' = '22',
'sink.scp.user' = 'backup_user',
'sink.scp.password' = 'ComplexPass123!',
'sink.scp.remote-path' = '/remote/archive/',
'sink.rate-limit-bytes-per-second' = '10485760'
);2. 开始任务
语法:
START PIPE<PipeId>示例:
START PIPE A2B3. 停止任务
语法:
STOP PIPE <PipeId>示例:
STOP PIPE A2B4. 删除任务
语法:
DROP PIPE [IF EXISTS] <PipeId>示例:
DROP PIPE IF EXISTS A2B5. 查看任务
语法:
-- 查看全部任务
SHOW PIPES
-- 查看指定任务
SHOW PIPE <PipeId>示例:
SHOW PIPES
SHOW PIPE A2B6. 修改任务
语法:
ALTER PIPE [IF EXISTS] <PipeId>
MODIFY/REPLACE SOURCE(...)
MODIFY/REPLACE PROCESSOR(...)
MODIFY/REPLACE SINK(...)示例:
ALTER PIPE A2B REPLACE SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '127.0.0.1:6668');