Skip to content

Commit

Permalink
CPC sketch UDFs
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexanderSaydakov committed Mar 18, 2019
1 parent 7b28290 commit 7f9e76e
Show file tree
Hide file tree
Showing 19 changed files with 1,917 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.12.0</version>
<version>0.13.0</version>
</dependency>

<!-- Hive Dependencies (provided scope) -->
Expand Down
188 changes: 188 additions & 0 deletions src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2019, Verizon Media.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/

package com.yahoo.sketches.hive.cpc;

import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;

import java.util.Arrays;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;

/**
* Hive UDAF to create an HllSketch from raw data.
*/
@Description(
name = "dataToSketch",
value = "_FUNC_(expr, lgK, seed) - "
+ "Compute a sketch on data 'expr' with given parameters lgK and target type",
extended = "Example:\n"
+ "> SELECT dataToSketch(val, 12) FROM src;\n"
+ "The return value is a binary blob that can be operated on by other sketch related functions."
+ " The lgK parameter controls the sketch size and rlative error expected from the sketch."
+ " It is optional an must be from 4 to 26. The default is 11, which is expected to yield errors"
+ " of roughly +-1.5% in the estimation of uniques with 95% confidence."
+ " The seed parameter is optional")
public class DataToSketchUDAF extends AbstractGenericUDAFResolver {

/**
* Performs argument number and type validation. DataToSketch expects
* to receive between one and three arguments.
* <ul>
* <li>The first (required) is the value to add to the sketch and must be a primitive.</li>
*
* <li>The second (optional) is the lgK from 4 to 21 (default 11).
* This must be an integral value and must be constant.</li>
*
* <li>The third (optional) is the update seed.
* </ul>
*
* @see org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
* #getEvaluator(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo)
*
* @param info Parameter info to validate
* @return The GenericUDAFEvaluator that should be used to calculate the function.
*/
@Override
public GenericUDAFEvaluator getEvaluator(final GenericUDAFParameterInfo info) throws SemanticException {
final ObjectInspector[] inspectors = info.getParameterObjectInspectors();

// Validate the correct number of parameters
if (inspectors.length < 1) {
throw new UDFArgumentException("Please specify at least 1 argument");
}

if (inspectors.length > 3) {
throw new UDFArgumentException("Please specify no more than 3 arguments");
}

// Validate first parameter type
ObjectInspectorValidator.validateCategoryPrimitive(inspectors[0], 0);

// Validate second argument if present
if (inspectors.length > 1) {
ObjectInspectorValidator.validateIntegralParameter(inspectors[1], 1);
if (!ObjectInspectorUtils.isConstantObjectInspector(inspectors[1])) {
throw new UDFArgumentTypeException(1, "The second argument must be a constant");
}
}

// Validate third argument if present
if (inspectors.length > 2) {
ObjectInspectorValidator.validateIntegralParameter(inspectors[2], 2);
if (!ObjectInspectorUtils.isConstantObjectInspector(inspectors[2])) {
throw new UDFArgumentTypeException(2, "The third argument must be a constant");
}
}

return new DataToSketchEvaluator();
}

public static class DataToSketchEvaluator extends SketchEvaluator {

private Mode mode_;

@SuppressWarnings("deprecation")
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
// Different State is used for the iterate phase and the merge phase.
// SketchState is more space-efficient, so let's use SketchState if possible.
if ((mode_ == Mode.PARTIAL1) || (mode_ == Mode.COMPLETE)) { // iterate() will be used
return new SketchState();
}
return new UnionState();
}

/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator#init(org.apache
* .hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode,
* org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector[])
*/
@Override
public ObjectInspector init(final Mode mode, final ObjectInspector[] parameters) throws HiveException {
super.init(mode, parameters);
mode_ = mode;
if ((mode == Mode.PARTIAL1) || (mode == Mode.COMPLETE)) {
// input is original data
inputInspector_ = (PrimitiveObjectInspector) parameters[0];
if (parameters.length > 1) {
lgKInspector_ = (PrimitiveObjectInspector) parameters[1];
}
if (parameters.length > 2) {
seedInspector_ = (PrimitiveObjectInspector) parameters[2];
}
} else {
// input for PARTIAL2 and FINAL is the output from PARTIAL1
intermediateInspector_ = (StructObjectInspector) parameters[0];
}

if ((mode == Mode.PARTIAL1) || (mode == Mode.PARTIAL2)) {
// intermediate results need to include the lgK and the target HLL type
return ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(LG_K_FIELD, SEED_FIELD, SKETCH_FIELD),
Arrays.asList(
PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.INT),
PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.LONG),
PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY)
)
);
}
// final results include just the sketch
return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY);
}

/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator#iterate(org
* .apache
* .hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer,
* java.lang.Object[])
*/
@Override
public void iterate(final @SuppressWarnings("deprecation") AggregationBuffer agg,
final Object[] parameters) throws HiveException {
if (parameters[0] == null) { return; }
final SketchState state = (SketchState) agg;
if (!state.isInitialized()) {
initializeState(state, parameters);
}
state.update(parameters[0], inputInspector_);
}

private void initializeState(final State state, final Object[] parameters) {
int lgK = DEFAULT_LG_K;
if (lgKInspector_ != null) {
lgK = PrimitiveObjectInspectorUtils.getInt(parameters[1], lgKInspector_);
}
long seed = DEFAULT_UPDATE_SEED;
if (seedInspector_ != null) {
seed = PrimitiveObjectInspectorUtils.getLong(parameters[2], seedInspector_);
}
state.init(lgK, seed);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019, Verizon Media.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/

package com.yahoo.sketches.hive.cpc;

import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;

import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.BytesWritable;

import com.yahoo.memory.Memory;
import com.yahoo.sketches.cpc.CpcSketch;

@Description(
name = "GetEstimateAndBounds",
value = "_FUNC_(sketch, kappa, seed)",
extended = "Returns an estimate and bounds of unique count from a given CpcSketch."
+ " The result is three double values: estimate, lower bound and upper bound."
+ " Optional kappa is a number of standard deviations from the mean: 1, 2 or 3 (default 2)."
+ " The seed is optional. It is needed if the sketch was created with a custom seed.")
public class GetEstimateAndErrorBoundsUDF extends UDF {

private static final int DEFAULT_KAPPA = 2;

/**
* Get an estimate from a given CpcSketch using the default seed and kappa
* @param serializedSketch CpcSketch in a serialized binary form
* @return estimate of unique count
*/
public List<Double> evaluate(final BytesWritable serializedSketch) {
return evaluate(serializedSketch, DEFAULT_KAPPA, DEFAULT_UPDATE_SEED);
}

/**
* Get an estimate and bounds from a given CpcSketch with default seed
* @param serializedSketch CpcSketch in a serialized binary form
* @param kappa given number of standard deviations from the mean: 1, 2 or 3
* @return estimate, lower and upper bound
*/
public List<Double> evaluate(final BytesWritable serializedSketch, final int kappa) {
return evaluate(serializedSketch, kappa, DEFAULT_UPDATE_SEED);
}

/**
* Get an estimate and bounds from a given CpcSketch with explicit seed
* @param serializedSketch CpcSketch in a serialized binary form
* @param kappa given number of standard deviations from the mean: 1, 2 or 3
* @param seed update seed
* @return estimate, lower and upper bound
*/
public List<Double> evaluate(final BytesWritable serializedSketch, final int kappa, final long seed) {
if (serializedSketch == null) { return null; }
final CpcSketch sketch = CpcSketch.heapify(Memory.wrap(serializedSketch.getBytes()), seed);
return Arrays.asList(sketch.getEstimate(), sketch.getLowerBound(kappa), sketch.getUpperBound(kappa));
}

}
45 changes: 45 additions & 0 deletions src/main/java/com/yahoo/sketches/hive/cpc/GetEstimateUDF.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019, Verizon Media.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/

package com.yahoo.sketches.hive.cpc;

import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.BytesWritable;

import com.yahoo.memory.Memory;
import com.yahoo.sketches.cpc.CpcSketch;

@Description(
name = "GetEstimate",
value = "_FUNC_(sketch)",
extended = "Returns an estimate of unique count from a given CpcSketch."
+ " The result is a double value.")
public class GetEstimateUDF extends UDF {

/**
* Get an estimate from a given CpcSketch with explicit seed
* @param serializedSketch CpcSketch in a serialized binary form
* @param seed update seed
* @return estimate of unique count
*/
public Double evaluate(final BytesWritable serializedSketch, final long seed) {
if (serializedSketch == null) { return null; }
final CpcSketch sketch = CpcSketch.heapify(Memory.wrap(serializedSketch.getBytes()), seed);
return sketch.getEstimate();
}

/**
* Get an estimate from a given CpcSketch using the default seed
* @param serializedSketch CpcSketch in a serialized binary form
* @return estimate of unique count
*/
public Double evaluate(final BytesWritable serializedSketch) {
return evaluate(serializedSketch, DEFAULT_UPDATE_SEED);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2019, Verizon Media.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/

package com.yahoo.sketches.hive.cpc;

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

class ObjectInspectorValidator {

static void validateCategoryPrimitive(final ObjectInspector inspector, final int index)
throws UDFArgumentTypeException {
if (inspector.getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(index, "Primitive parameter expected, but "
+ inspector.getCategory().name() + " was recieved as parameter " + (index + 1));
}
}

static void validateGivenPrimitiveCategory(final ObjectInspector inspector, final int index,
final PrimitiveObjectInspector.PrimitiveCategory category) throws UDFArgumentTypeException
{
validateCategoryPrimitive(inspector, index);
final PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector) inspector;
if (primitiveInspector.getPrimitiveCategory() != category) {
throw new UDFArgumentTypeException(index, category.name() + " value expected as parameter "
+ (index + 1) + " but " + primitiveInspector.getPrimitiveCategory().name() + " was received");
}
}

static void validateIntegralParameter(final ObjectInspector inspector, final int index)
throws UDFArgumentTypeException {
validateCategoryPrimitive(inspector, index);
final PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector) inspector;
switch (primitiveInspector.getPrimitiveCategory()) {
case BYTE:
case SHORT:
case INT:
case LONG:
break;
// all other types are invalid
default:
throw new UDFArgumentTypeException(index, "Only integral type parameters are expected but "
+ primitiveInspector.getPrimitiveCategory().name() + " was passed as parameter " + (index + 1));
}
}

}
Loading

0 comments on commit 7f9e76e

Please sign in to comment.