Untyped UDAF

A custom untyped UDAF can be written extending UserDefinedAggregateFunction and then defining the following functions:

  • public StructType inputSchema(): The datatype of the input argument(s) of the aggregate function is defined in this method.
  • public DataType dataType(): The return value of the UDAF will correspond to the datatype defined in this method.
  • public boolean deterministic(): If the function is deterministic, that is, given the same input, always return the same output and it should return true or false.
  • public StructType bufferSchema(): The internal fields that are required to compute aggregation are defined in this method, such as for calculating the average, one would need sum and count. For a UDAF with two intermediate values of DoubleType and LongType, the returned StructType will be as follows :
new StructType() .add("sumVal", DoubleType) .add("countVal", LongType)

The names of the field defined in StructType are used to identify the buffer values.

  • public void initialize(MutableAggregationBuffer arg0): The method initializes the values of the fields defined in the bufferSchema() method. The rule for initializing the values is that, while applying the merge function on two initial buffers, it should return the same initial buffer, that is, merge(initialBuffer, initialBuffer) should equal initialBuffer. On each node, the initialize() method is called once for each group of rows.
  • public void update(MutableAggregationBuffer arg0, Row arg1): The update method updates the values of bufferSchema() with new input data from input. Since UDAF operates over multiple rows, this method gets called once per input row.
  • public void merge(MutableAggregationBuffer arg0, Row arg1): The merge function aggregates two partial bufferSchema while returning the updated value in MutableAggregationBuffer.
  • public Object evaluate(Row arg0): The evaluate method operates upon the aggregated bufferSchema to return the final output value of datatype defined in method dataType().

As an example, let's understand how average gets calculated in Spark SQL:

publicclass AverageUDAF extends UserDefinedAggregateFunction {
private static final long serialVersionUID = 1L;
@Override
public StructType inputSchema() {
returnnew StructType(new StructField[] { new StructField("counter", DataTypes.DoubleType, true, Metadata.empty())});
}

@Override
public DataType dataType() {
return DataTypes.DoubleType;
}

@Override
publicboolean deterministic() {
return false;
}

@Override
public StructType bufferSchema() {
returnnew StructType() .add("sumVal", DataTypes.DoubleType) .add("countVal", DataTypes.DoubleType);
}

@Override
publicvoid initialize(MutableAggregationBuffer bufferAgg) {
bufferAgg.update(0, 0.0);
bufferAgg.update(1, 0.0);
}

@Override
publicvoid update(MutableAggregationBuffer bufferAgg, Row row) {
bufferAgg.update(0, bufferAgg.getDouble(0)+row.getDouble(0));
bufferAgg.update(1, bufferAgg.getDouble(1)+2.0);
}

@Override
publicvoid merge(MutableAggregationBuffer bufferAgg, Row row) {
bufferAgg.update(0, bufferAgg.getDouble(0)+row.getDouble(0));
bufferAgg.update(1, bufferAgg.getDouble(1)+row.getDouble(1));
}

@Override
public Object evaluate(Row row) {
return row.getDouble(0)/row.getDouble(1);
}
}

Calling the UDAF function from Spark is similar to UDF, that is, instantiate the UDAF function, register to SparkSession, and finally call the alias of UDAF used while registering in SparkSession:

//Instantiate UDAF
AverageUDAF calcAvg= new AverageUDAF();
//Register UDAF to SparkSession
sparkSession.udf().register("calAvg", calcAvg);
//Use UDAF
sparkSession.sql("select deptno,calAvg(salary) from emp_ds group by deptno ").show();
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.168.214