用户自定义函数
用户自定义函数
1. UDF介绍
UDF(User Defined Function)即用户自定义函数,IoTDB 提供多种内置的时序处理函数,也支持扩展自定义函数来满足更多的计算需求。
IoTDB 表模型中支持三种类型的 UDF ,如下表所示。
UDF 类型 | 函数类型 | 描述 |
---|---|---|
UDSF(User-defined Scalar Function) | 标量函数 | 输入 k 列 1 行数据,输出1 列 1 行数据(一对一)。 |
UDAF(User-defined Aggregate Function) | 聚合函数 | 输入k 列 m 行数据,输出1 列 1 行数据(多对一)。 |
UDTF(User-defined Table Function) | 表函数 | 根据输入的动态参数生成“表”形式的结果集。 |
UDSF
可用于标量函数出现的任何子句和表达式中,如select子句、where子句等。select udsf1(s1) from table1 where udsf2(s1)>0
UDAF
可用于聚合函数出现的任何子句和表达式中,如select子句、having子句等;select udaf1(s1), device_id from table1 group by device_id having udaf2(s1)>0
UDTF
可以像关系表一样在from子句中使用;select * from udtf('t1', bid);
2. UDF 管理
2.1 UDF 注册
准备 UDF 实现的 JAR 包,其中包含 UDF 实现类,如
org.apache.iotdb.udf.ScalarFunctionExample
。Jar 包的放置有两种方式:
- 本地:需要将 JAR 包放置到集群所有节点的
ext/udf
目录下。 - 远端:需要将 JAR 包上传到 URI 服务器上并确保 IoTDB 实例能够访问该 URI 服务器(注册成功后IoTDB 会下载 JAR 包并同步到整个集群)。
- 使用以下 SQL 语句注册 UDF
CREATE FUNCTION <UDF-NAME> AS <UDF-CLASS-FULL-PATHNAME> (USING URI URI-STRING)
- 示例
-- 本地
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample';
-- 远端
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample' USING URI 'http://jar/example.jar'
注意:
- UDF 在装载过程中无需启停服务器。
- UDF 名称大小写不敏感,不能与 IoTDB 内置函数重名。
- 表模型和树模型的 UDF 空间相互独立。
- 避免在不同的 JAR 包中创建全类名相同但功能逻辑不同的 UDF 类。如果存在,系统在执行 UDF 时会随机加载其中一个,造成执行行为不一致。
2.2 UDF 卸载
SQL 语法如下:
DROP FUNCTION <UDF-NAME>
示例:卸载上述例子的 UDF:
DROP FUNCTION contain_null
2.3 UDF 查看
- 如果 State 为 UNAVAILABLE,可能是在注册或卸载过程中系统发生了错误,请查看系统日志进行排查,重新注册或卸载 UDF 直至成功即可。
SHOW FUNCTIONS
2.4 UDF 配置
- 可以在
iotdb-system.properties
中配置 UDF Jar 文件的存储目录:
# UDF lib dir
udf_lib_dir=ext/udf
3. UDF 开发
3.1 UDF 依赖
可以从 Maven 库 中搜索下面示例中的依赖,请注意选择和目标 IoTDB 服务器版本相同的依赖版本。
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>udf-api</artifactId>
<version>2.0.0</version>
<scope>provided</scope>
</dependency>
3.2 标量函数(UDSF)
编写一个 UDSF 需要实现org.apache.iotdb.udf.api.relational.ScalarFunction
接口。
public interface ScalarFunction extends SQLFunction {
/**
* In this method, the user need to do the following things:
*
* <ul>
* <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
* any parameter is not valid.
* <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
* <li>Construct and return a {@linkplain ScalarFunctionAnalysis} object.
* </ul>
*
* @param arguments arguments used to validate
* @throws UDFArgumentNotValidException if any parameter is not valid
* @return the analysis result of the scalar function
*/
ScalarFunctionAnalysis analyze(FunctionArguments arguments) throws UDFArgumentNotValidException;
/**
* This method is called after the ScalarFunction is instantiated and before the beginning of the
* transformation process. This method is mainly used to initialize the resources used in
* ScalarFunction.
*
* @param arguments used to parse the input arguments entered by the user
* @throws UDFException the user can throw errors if necessary
*/
default void beforeStart(FunctionArguments arguments) throws UDFException {
// do nothing
}
/**
* This method will be called to process the transformation. In a single UDF query, this method
* may be called multiple times.
*
* @param input original input data row
* @throws UDFException the user can throw errors if necessary
*/
Object evaluate(Record input) throws UDFException;
/** This method is mainly used to release the resources used in the ScalarFunction. */
default void beforeDestroy() {
// do nothing
}
}
接口说明:
接口定义 | 描述 | 是否必须 |
---|---|---|
ScalarFunctionAnalysis analyze(FunctionArguments arguments); | 1. 校验FunctionArguments 中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。2. 根据 FunctionArguments 构造ScalarFunctionAnalysis ,包括输出类型等信息。 | 是 |
void beforeStart(FunctionArguments arguments); | 在 UDSF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
Object evaluate(Record input) throws UDFException; | UDSF 处理逻辑,根据一行输入数据,返回一行输出数据。 | 是 |
void beforeDestroy(); | UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 ScalarFunctionAnalysis 中的字段:
字段类型 | 字段名称 | 默认值 |
---|---|---|
Type | outputDataType | 无 |
示例:UDSF 的实现示例,输入任意数据类型的任意多列,返回一个布尔值,代表该行输入是否包含 NULL 值。
3.3 聚合函数(UDAF)
一个完整的 UDAF 定义涉及到 State
和 AggregateFunction
两个类。
3.3.1 State 类
编写一个 State 类需要实现org.apache.iotdb.udf.api.State
接口。
public interface State {
/** Reset your state object to its initial state. */
void reset();
/**
* Serialize your state into byte array. The order of serialization must be consistent with
* deserialization.
*/
byte[] serialize();
/**
* Deserialize byte array into your state. The order of deserialization must be consistent with
* serialization.
*/
void deserialize(byte[] bytes);
/** Destroy state. You may release previously binding resource in this method. */
default void destroyState() {}
;
}
接口说明:
接口定义 | 描述 | 是否必须 |
---|---|---|
void reset() | 将State 对象重置为初始的状态,您需要像编写构造函数一样,在该方法内填入State 类中各个字段的初始值。 | 是 |
byte[] serialize() | 将State 序列化为二进制数据。该方法用于 IoTDB 内部的State 对象传递,注意序列化的顺序必须和下面的反序列化方法一致。 | 是 |
void deserialize(byte[] bytes) | 将二进制数据反序列化为State 。该方法用于 IoTDB 内部的State 对象传递,注意反序列化的顺序必须和上面的序列化方法一致。 | 是 |
void destroyState() | 进行资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次。 | 否 |
3.3.2 AggregateFunction 类
编写一个 UDAF 需要实现 org.apache.iotdb.udf.api.relational.AggregateFunction
接口。
public interface AggregateFunction extends SQLFunction {
/**
* In this method, the user need to do the following things:
*
* <ul>
* <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
* any parameter is not valid.
* <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
* <li>Construct and return a {@linkplain AggregateFunctionAnalysis} object.
* </ul>
*
* @param arguments arguments used to validate
* @throws UDFArgumentNotValidException if any parameter is not valid
* @return the analysis result of the scalar function
*/
AggregateFunctionAnalysis analyze(FunctionArguments arguments)
throws UDFArgumentNotValidException;
/**
* This method is called after the AggregateFunction is instantiated and before the beginning of
* the transformation process. This method is mainly used to initialize the resources used in
* AggregateFunction.
*
* @param arguments used to parse the input arguments entered by the user
* @throws UDFException the user can throw errors if necessary
*/
default void beforeStart(FunctionArguments arguments) throws UDFException {
// do nothing
}
/** Create and initialize state. You may bind some resource in this method. */
State createState();
/**
* Update state with data columns.
*
* @param state state to be updated
* @param input original input data row
*/
void addInput(State state, Record input);
/**
* Merge two state in execution engine.
*
* @param state current state
* @param rhs right-hand-side state to be merged
*/
void combineState(State state, State rhs);
/**
* Calculate output value from final state
*
* @param state final state
* @param resultValue used to collect output data points
*/
void outputFinal(State state, ResultValue resultValue);
/**
* Remove input data from state. This method is used to remove the data points that have been
* added to the state. Once it is implemented, {@linkplain
* AggregateFunctionAnalysis.Builder#removable(boolean)} should be set to true.
*
* @param state state to be updated
* @param input row to be removed
*/
default void remove(State state, Record input) {
throw new UnsupportedOperationException();
}
/** This method is mainly used to release the resources used in the SQLFunction. */
default void beforeDestroy() {
// do nothing
}
}
接口说明:
接口定义 | 描述 | 是否必须 |
---|---|---|
AggregateFunctionAnalysis analyze(FunctionArguments arguments); | 1. 校验FunctionArguments 中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。2. 根据 FunctionArguments 构造AggregateFunctionAnalysis ,包括输出类型、removable 等信息。 | 是 |
void beforeStart(FunctionArguments arguments); | 在 UDAF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
State createState(); | 创建State 对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。 | 是 |
void addInput(State state, Record input); | 更新State 对象,将输入的一行 Record 数据添加到聚合状态中。 | 是 |
void combineState(State state, State rhs); | 将rhs 状态合并至state 状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State 对象,然后调用该方法合并成完整的State 。 | 是 |
void outputFinal(State state, ResultValue resultValue); | 根据State 中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。 | 是 |
void remove(State state, Record input); | 更新State 对象,将输入的一行 Record 数据从聚合状态中剔除。实现该方法需要设置 AggregateFunctionAnalysis 中的 removable 字段为 true。 | 否 |
void beforeDestroy(); | UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 AggregateFunctionAnalysis 中的字段:
字段类型 | 字段名称 | 默认值 |
---|---|---|
Type | outputDataType | 无 |
boolean | removable | false |
示例:UDAF 的实现示例,计算不为 NULL 的行数。
3.4 表函数(UDTF)
3.4.1 定义
表函数,也被称为表值函数(Table-Valued Function,TVF),不同于标量函数、聚合函数和窗口函数的返回值是一个“标量值”,表函数的返回值是一个“表”(结果集)。
3.4.2 使用
表函数可以像关系表一样,以tableFunctionCall
的形式在 SQL 查询的 FROM
子句中使用,支持传递参数,并根据参数动态生成结果集。
3.4.3 语法
tableFunctionCall
的具体定义如下所示:
tableFunctionCall
: qualifiedName '(' (tableFunctionArgument (',' tableFunctionArgument)*)?')'
;
tableFunctionArgument
: (identifier '=>')? (tableArgument | scalarArgument)
;
tableArgument
: tableArgumentRelation
(PARTITION BY ('(' (expression (',' expression)*)? ')' | expression))?
(ORDER BY ('(' sortItem (',' sortItem)* ')' | sortItem))?
;
tableArgumentRelation
: qualifiedName (AS? identifier columnAliases?)? #tableArgumentTable
| '(' query ')' (AS? identifier columnAliases?)? #tableArgumentQuery
;
scalarArgument
: expression
| timeDuration
;
例如:
// 这是从表函数中进行查询,传入一个字符串“t1”参数。
select * from tvf('t1');
// 这是从表函数中进行查询,传入一个字符串“t1”参数,一个 bid 表参数。
select * from tvf('t1', bid);
3.4.4 函数参数
IoTDB 中的表函数为多态表值函数,支持参数类型如下所示:
参数类型 | 定义 | 示例 |
---|---|---|
标量参数(Scalar Argument) | 必须是常量表达式,可以是任何的 SQL 数据类型,需要和声明的类型兼容。 | SIZE => 42 ,SIZE => '42' ,SIZE => 42.2 ,SIZE => 12h ,SIZE => 60m |
表参数(Table Argument) | 可以是一个表名或一条查询语句。 | input => orders ,data => (SELECT * FROM region, nation WHERE region.regionkey = nation.regionkey) |
表参数具有如下属性:
组语义与行语义
被声明为组语义(Set Semantic)的表参数意味着需要根据整个完整的分区才能得到结果集。
- 允许在调用时指定 PARTITION 或 ORDER,执行引擎会 partition-by-partition 地进行处理。
input => orders PARTITION BY device_id ORDER BY time
- 如果没有指定 PARTITION,则认为所有的数据都在同一个数据组中。
被声明为行语义(Row Semantics)的表参数意味着行与行之间没有依赖关系。不允许在调用时指定 PARTITION 或 ORDER,执行引擎会 row-by-row 地进行处理。
列穿透(Pass-through Columns)
- 表参数如果被声明为列穿透,则表函数的结果列会包含该表参数输入的所有列。
- 例如,窗口分析函数,通过为表参数设置列穿透属性,可实现输出结果为“所有输入列(包含原始列和聚合结果)+窗口ID”,即“原始数据+分析结果”。
3.4.5 传递方式
传递方式 | 描述 | 示例 |
---|---|---|
按名传递 | 1. 可以通过任意的顺序传递参数。 2. 被声明有默认值的参数可以被省略。 3. 参数名大小写不敏感。 | SELECT * FROM my_function(row_count => 100, column_count => 1); SELECT * FROM my_function(column_count => 1, row_count => 100); SELECT * FROM my_function(column_count => 1); |
按位置传递 | 1. 必须按照声明的顺序进行传递参数。 2. 如果余下的参数都有默认值,可以只传一部分参数。 | SELECT * FROM my_function(1, 100); SELECT * FROM my_function(1); |
注意:
以上两种方式不允许混用,否则在语义解析时候会抛出“All arguments must be passed by name or all must be passed positionally”异常。
3.4.6 返回结果
表函数的结果集由以下两部分组成。
- 由表函数创建的生成列(Proper Columns)。
- 根据表参数自动构建的映射列(Pass-through Columns)。
- 如果指定了表参数的属性为列穿透,则会包括输入关系的所有列;
- 如果没有指定为列穿透但指定了 PartitionBy,则是 PartitionBy 的列;
- 如果均未指定,则不根据表参数自动构建列。
3.5 完整Maven项目示例
如果使用 Maven,可以参考示例项目udf-example。