函數¶
外掛程式實作¶
函數框架用於實作 SQL 函數。Presto 包含許多內建函數。為了實作新的函數,您可以編寫一個外掛程式,從 getFunctions()
傳回一個或多個函數
public class ExampleFunctionsPlugin
implements Plugin
{
@Override
public Set<Class<?>> getFunctions()
{
return ImmutableSet.<Class<?>>builder()
.add(ExampleNullFunction.class)
.add(IsNullFunction.class)
.add(IsEqualOrNullFunction.class)
.add(ExampleStringFunction.class)
.add(ExampleAverageFunction.class)
.build();
}
}
請注意,ImmutableSet
類別是 Guava 的公用程式類別。 getFunctions()
方法包含我們將在本教學課程中實作的所有函數類別。
如需程式碼庫中的完整範例,請參閱 Presto 原始碼根目錄中的 presto-ml
模組(用於機器學習函數)或 presto-teradata-functions
模組(用於與 Teradata 相容的函數)。
純量函數實作¶
函數框架使用註解來指示有關函數的相關資訊,包括名稱、描述、傳回類型和參數類型。以下是實作 is_null
的範例函數
public class ExampleNullFunction
{
@ScalarFunction("is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNull(@SqlNullable @SqlType(StandardTypes.VARCHAR) Slice string)
{
return (string == null);
}
}
函數 is_null
接受單一 VARCHAR
引數,並傳回一個 BOOLEAN
,指示引數是否為 NULL
。請注意,函數的引數類型為 Slice
。 VARCHAR
使用 Slice
,它本質上是 byte[]
的包裝函式,而不是 String
作為其原生容器類型。
@SqlType
:@SqlType
註解用於宣告傳回類型和引數類型。請注意,Java 程式碼的傳回類型和引數必須符合對應註解的原生容器類型。@SqlNullable
:@SqlNullable
註解指示引數可能為NULL
。如果沒有此註解,框架會假設如果任何引數為NULL
,則所有函數都會傳回NULL
。當使用具有基本原生容器類型 (例如BigintType
) 的Type
時,使用@SqlNullable
時,請使用原生容器類型的物件包裝函式。如果方法在引數為非 Null 時可能傳回NULL
,則必須使用@SqlNullable
註解。
參數化純量函數¶
具有類型參數的純量函數具有一些額外的複雜性。為了使我們之前的範例適用於任何類型,我們需要以下內容
@ScalarFunction(name = "is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
public final class IsNullFunction
{
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullSlice(@SqlNullable @SqlType("T") Slice value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullLong(@SqlNullable @SqlType("T") Long value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullDouble(@SqlNullable @SqlType("T") Double value)
{
return (value == null);
}
// ...and so on for each native container type
}
@TypeParameter
:@TypeParameter
註解用於宣告類型參數,該參數可用於引數類型@SqlType
註解,或函數的傳回類型。它也可用於註解類型為Type
的參數。在執行階段,引擎會將具體類型繫結至此參數。或者,可以透過將boundedBy
類型類別提供給@TypeParameter
,將類型參數限制為特定類型的子代。@OperatorDependency
可用於宣告需要用於操作給定類型參數的其他函數。例如,以下函數只會繫結至已定義 equals 函數的類型
@ScalarFunction(name = "is_equal_or_null", calledOnNullInput = true)
@Description("Returns TRUE if arguments are equal or both NULL")
public final class IsEqualOrNullFunction
{
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isEqualOrNullSlice(
@OperatorDependency(operator = OperatorType.EQUAL, returnType = StandardTypes.BOOLEAN, argumentTypes = {"T", "T"}) MethodHandle equals,
@SqlNullable @SqlType("T") Slice value1,
@SqlNullable @SqlType("T") Slice value2)
{
if (value1 == null && value2 == null) {
return true;
}
if (value1 == null || value2 == null) {
return false;
}
return (boolean) equals.invokeExact(value1, value2);
}
// ...and so on for each native container type
}
另一個純量函數範例¶
lowercaser
函數接受單一 VARCHAR
引數,並傳回一個 VARCHAR
,它是轉換為小寫的引數
public class ExampleStringFunction
{
@ScalarFunction("lowercaser")
@Description("converts the string to alternating case")
@SqlType(StandardTypes.VARCHAR)
public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice)
{
String argument = slice.toStringUtf8();
return Slices.utf8Slice(argument.toLowerCase());
}
}
請注意,對於大多數常見的字串函數,包括將字串轉換為小寫,Slice 程式庫也提供了直接在底層 byte[]
上運作的實作,這些實作具有更好的效能。此函數沒有 @SqlNullable
註解,這表示如果引數為 NULL
,則結果將自動為 NULL
(不會呼叫函數)。
Codegen 純量函數實作¶
純量函數也可以在位元組碼中實作,讓我們可以根據 @TypeParameter
專門化和最佳化函數
@CodegenScalarFunction
:@CodegenScalarFunction
註解用於宣告以位元組碼實作的純量函數。@SqlType
註解用於宣告傳回類型。它採用Type
作為參數,這些參數也具有@SqlType
註解。傳回類型是MethodHandle
,它是 codegen 函數方法。
public class CodegenArrayLengthFunction
{
@CodegenScalarFunction("array_length", calledOnNullInput = true)
@SqlType(StandardTypes.INTEGER)
@TypeParameter("K")
public static MethodHandle arrayLength(@SqlType("array(K)") Type arr)
{
CallSiteBinder binder = new CallSiteBinder();
ClassDefinition classDefinition = new ClassDefinition(a(Access.PUBLIC, FINAL), makeClassName("ArrayLength"), type(Object.class));
classDefinition.declareDefaultConstructor(a(PRIVATE));
Parameter inputBlock = arg("inputBlock", Block.class);
MethodDefinition method = classDefinition.declareMethod(a(Access.PUBLIC, STATIC), "array_length", type(Block.class), ImmutableList.of(inputBlock));
BytecodeBlock body = method.getBody();
body.append(inputBlock.invoke("getPositionCount", int.class).ret());
Class<?> clazz = defineClass(classDefinition, Object.class, binder.getBindings(), CodegenArrayLengthFunction.class.getClassLoader());
return new methodHandle(clazz, "array_length", Block.class), Optional.of();
}
}
彙總函數實作¶
彙總函數使用類似於純量函數的框架,但稍微複雜一些。
AccumulatorState
:所有彙總函數都會將輸入列累加到狀態物件中;此物件必須實作
AccumulatorState
。對於簡單的彙總,只需將AccumulatorState
擴展到具有您想要的 getter 和 setter 的新介面,框架將為您產生所有實作和序列化程式。如果您需要更複雜的狀態物件,則需要實作AccumulatorStateFactory
和AccumulatorStateSerializer
,並透過AccumulatorStateMetadata
註解提供這些。
以下程式碼實作彙總函數 avg_double
,它會計算 DOUBLE
欄位的平均值
@AggregationFunction("avg_double")
public class AverageAggregation
{
@InputFunction
public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
{
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}
@CombineFunction
public static void combine(LongAndDoubleState state, LongAndDoubleState otherState)
{
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}
@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out)
{
long count = state.getLong();
if (count == 0) {
out.appendNull();
}
else {
double value = state.getDouble();
DOUBLE.writeDouble(out, value / count);
}
}
}
平均值有兩個部分:欄位中每列 DOUBLE
的總和,以及看到的列數的 LONG
計數。 LongAndDoubleState
是一個介面,會擴充 AccumulatorState
public interface LongAndDoubleState
extends AccumulatorState
{
long getLong();
void setLong(long value);
double getDouble();
void setDouble(double value);
}
如上所述,對於簡單的 AccumulatorState
物件,只需定義具有 getter 和 setter 的介面即可,框架會為您產生實作。
接下來將深入探討與撰寫彙總函數相關的各種註解
@InputFunction
:@InputFunction
註解宣告一個函數,該函數接受輸入列並將它們儲存在AccumulatorState
中。類似於純量函數,您必須使用@SqlType
註解參數。請注意,與上述純量範例中,使用Slice
來保存VARCHAR
不同,這裡輸入函數的參數使用原始的double
型別。在這個例子中,輸入函數只是追蹤正在執行的列數(透過setLong()
)和執行總和(透過setDouble()
)。@CombineFunction
:@CombineFunction
註解宣告一個函數,用於合併兩個狀態物件。此函數用於合併所有部分聚合狀態。它接受兩個狀態物件,並將結果合併到第一個物件中(在上面的範例中,僅僅是將它們相加)。@OutputFunction
:@OutputFunction
是計算聚合時最後呼叫的函數。它接受最終狀態物件(合併所有部分狀態的結果),並將結果寫入BlockBuilder
。序列化發生在哪裡?什麼是
GroupedAccumulatorState
?@InputFunction
通常在與@CombineFunction
不同的 worker 上執行,因此狀態物件會在這些 worker 之間由聚合框架序列化和傳輸。當執行GROUP BY
聚合時,會使用GroupedAccumulatorState
,如果您沒有指定AccumulatorStateFactory
,則會自動為您生成一個實作。
進階使用案例¶
原始區塊輸入¶
純量和聚合函數註解都允許您定義操作原生型別的方法。在 Java 中,這些原生型別是 boolean
、Slice
和 long
。對於參數化實作或參數型別,標準 Java 型別無法使用,因為它們無法表示輸入資料。
若要定義可以接受任何型別的方法處理程序,請結合使用 @BlockPosition
和 @BlockIndex
參數。與 @SqlNullable
註解類似,使用 @NullablePosition
註解來表示當區塊位置為 NULL
時應呼叫該函數。
這適用於純量和聚合函數實作。
@ScalarFunction("example")
public static Block exampleFunction(
@BlockPosition @NullablePosition @SqlType("array(int)") Block block,
@BlockIndex int index) { /* ...implementation */ }
使用 @BlockPosition
套用泛型型別¶
當函數使用 @TypeParameter
註解定義時,使用 @BlockPosition
語法的函數簽名能夠對泛型型別進行操作。在 @BlockPosition
引數中加入額外的 @SqlType("T")
註解,表示它接受與泛型型別相對應的引數。這適用於純量和聚合函數實作。
@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
@BlockPosition @SqlType("T") Block block,
@BlockIndex int index) { /* ...implementation */ }
使用 @TypeParameter
擷取泛型型別¶
在函數的引數列表開頭加入 @TypeParameter
註解,以允許實作執行型別特定的邏輯。將一個帶有 @TypeParameter
註解的 Type
型別引數作為函數簽名的第一個引數,以存取 Type
。這適用於純量和聚合函數。
@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
@TypeParameter("T") Type type,
@BlockPosition @SqlType("T") Block block,
@BlockIndex int index) { /* ...implementation */ }