Skip to content

Commit

Permalink
Added UNIONTYPE support ( fixes #53)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcongiu committed Aug 30, 2015
1 parent 48e6ce8 commit 8add4d0
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 30 deletions.
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,32 @@ ALTER TABLE json_table SET SERDEPROPERTIES ( "ignore.malformed.json" = "true");
it will not make the query fail, and the above record will be returned as
NULL null null


### UNIONTYPE support (PLEASE READ IF YOU USE IT)

A Uniontype is a field that can contain different types, like in C.
Hive usually stores a 'tag' that is basically the index of the datatype,
for instance, if you create a uniontype<int,string,float> , tag would be
0 for int, 1 for string, 2 for float (see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-UnionTypes).

Now, JSON data does not store anything like that, so the serde will try and
look what it can do.. that is, check, in order, if the data is compatible
with any of the given types. So, THE ORDER MATTERS. Let's say you define
a field f as UNIONTYPE<int,string> and your js has
```{json}
{ "f": "123" } // parsed as int, since int precedes string in definitions,
// and "123" can be parsed to a number
{ "f": "asv" } // parsed as string
```
That is, a number in a string. This will return a tag of 0 and an int rather
than a string.
It's worth noticing that complex Union types may not be that efficient, since
the SerDe may try to parse the same data in several ways; however, several
people asked me to implement this feature to cope with bad JSON, so..I did.




### MAPPING HIVE KEYWORDS

Sometimes it may happen that JSON data has attributes named like reserved words in hive.
Expand Down Expand Up @@ -233,6 +259,7 @@ Versions:
refactored Timestamp Handling
* 1.2 (2014/06) Refactored to multimodule for CDH5 compatibility
* 1.3 (2014/09/08) fixed #80, #82, #84, #85
* 1.4 ???? Added UNIONTYPE support (#53), made CDH5 default



Expand Down
19 changes: 12 additions & 7 deletions json-serde/src/main/java/org/openx/data/jsonserde/JsonSerDe.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.util.Properties;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
Expand All @@ -32,11 +31,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
Expand Down Expand Up @@ -159,7 +154,6 @@ public Object deserialize(Writable w) throws SerDeException {

// Try parsing row into JSON object
Object jObj = null;


try {
String txt = rowText.toString().trim();
Expand Down Expand Up @@ -335,6 +329,8 @@ Object serializeField(Object obj,
case STRUCT:
result = serializeStruct(obj, (StructObjectInspector)oi, null);
break;
case UNION:
result = serializeUnion(obj, (UnionObjectInspector)oi);
}
return result;
}
Expand Down Expand Up @@ -365,6 +361,15 @@ private JSONArray serializeList(Object obj, ListObjectInspector loi) {
return ar;
}

/**
* Serializes a Union
*/
private Object serializeUnion(Object obj, UnionObjectInspector oi) {
if(obj == null) return null;

return serializeField(obj, oi.getObjectInspectors().get(oi.getTag(obj)));
}

/**
* Serializes a Hive map&lt;&gt; using a JSONObject.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,15 @@
*======================================================================*/
package org.openx.data.jsonserde.objectinspector;

import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.*;
import org.openx.data.jsonserde.objectinspector.primitive.JavaStringByteObjectInspector;
import org.openx.data.jsonserde.objectinspector.primitive.JavaStringDoubleObjectInspector;
import org.openx.data.jsonserde.objectinspector.primitive.JavaStringFloatObjectInspector;
Expand Down Expand Up @@ -91,6 +86,17 @@ public static ObjectInspector getJsonObjectInspectorFromTypeInfo(
fieldObjectInspectors, options);
break;
}
case UNION:{
UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;

List<ObjectInspector> ois = new LinkedList<ObjectInspector>();
for( TypeInfo ti : ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos()) {
ois.add(getJsonObjectInspectorFromTypeInfo(ti, options));
}
result = getJsonUnionObjectInspector(ois, options);
break;
}

default: {
result = null;
}
Expand All @@ -100,12 +106,33 @@ public static ObjectInspector getJsonObjectInspectorFromTypeInfo(
return result;
}


static HashMap<ArrayList<Object>, JsonUnionObjectInspector> cachedJsonUnionObjectInspector
= new HashMap<ArrayList<Object>, JsonUnionObjectInspector>();

public static JsonUnionObjectInspector getJsonUnionObjectInspector(
List<ObjectInspector> ois,
JsonStructOIOptions options) {
ArrayList<Object> signature = new ArrayList<Object>();
signature.add(ois);
signature.add(options);
JsonUnionObjectInspector result = cachedJsonUnionObjectInspector
.get(signature);
if (result == null) {
result = new JsonUnionObjectInspector(ois, options);
cachedJsonUnionObjectInspector.put(signature,result);

}
return result;
}

/*
* Caches Struct Object Inspectors
*/
static HashMap<ArrayList<Object>, JsonStructObjectInspector> cachedStandardStructObjectInspector
= new HashMap<ArrayList<Object>, JsonStructObjectInspector>();


public static JsonStructObjectInspector getJsonStructObjectInspector(
List<String> structFieldNames,
List<ObjectInspector> structFieldObjectInspectors,
Expand Down Expand Up @@ -171,12 +198,12 @@ public static JsonMapObjectInspector getJsonMapObjectInspector(
= new EnumMap<PrimitiveCategory, AbstractPrimitiveJavaObjectInspector>(PrimitiveCategory.class);

static {
primitiveOICache.put(PrimitiveCategory.BYTE, new JavaStringByteObjectInspector());
primitiveOICache.put(PrimitiveCategory.SHORT, new JavaStringShortObjectInspector());
primitiveOICache.put(PrimitiveCategory.BYTE, new JavaStringByteObjectInspector());
primitiveOICache.put(PrimitiveCategory.SHORT, new JavaStringShortObjectInspector());
primitiveOICache.put(PrimitiveCategory.INT, new JavaStringIntObjectInspector());
primitiveOICache.put(PrimitiveCategory.LONG, new JavaStringLongObjectInspector());
primitiveOICache.put(PrimitiveCategory.FLOAT, new JavaStringFloatObjectInspector());
primitiveOICache.put(PrimitiveCategory.DOUBLE, new JavaStringDoubleObjectInspector());
primitiveOICache.put(PrimitiveCategory.FLOAT, new JavaStringFloatObjectInspector());
primitiveOICache.put(PrimitiveCategory.DOUBLE, new JavaStringDoubleObjectInspector());
primitiveOICache.put(PrimitiveCategory.TIMESTAMP, new JavaStringTimestampObjectInspector());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.openx.data.jsonserde.objectinspector;

import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.openx.data.jsonserde.json.JSONArray;
import org.openx.data.jsonserde.json.JSONObject;

import java.util.List;

/**
* Created by rcongiu on 8/29/15.
*/
public class JsonUnionObjectInspector implements UnionObjectInspector {
JsonStructOIOptions options;
private List<ObjectInspector> ois;


public JsonUnionObjectInspector(List<ObjectInspector> ois,JsonStructOIOptions opts) {
this.ois = ois;
options = opts;
}


@Override
public List<ObjectInspector> getObjectInspectors() {
return ois;
}


/*
* This method looks at the object and finds which object inspector should be used.
*/
@Override
public byte getTag(Object o) {
if(o==null) return 0;
for(byte i =0; i< ois.size(); i ++) {
ObjectInspector oi = ois.get(i);

switch(oi.getCategory()) {
case LIST: if(o instanceof JSONArray) return i; else break;
case STRUCT: if(o instanceof JSONObject) return i; else break;
case MAP: if(o instanceof JSONObject) return i; else break;
case UNION: return i;

case PRIMITIVE: {
PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
try {
// try to parse it, return if able to
poi.getPrimitiveJavaObject(o);
return i;
} catch (Exception ex) { continue;}
}
default :throw new Error("Object Inspector " + oi.toString() + " Not supported for object " + o.toString());
}
}
throw new Error("No suitable Object Inspector found for object " + o.toString() + " of class " + o.getClass().getCanonicalName());
}

@Override
public Object getField(Object o) {
return o;
}

@Override
public String getTypeName() {
return ObjectInspectorUtils.getStandardUnionTypeName(this);

}

@Override
public Category getCategory() {
return Category.UNION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
Expand Down Expand Up @@ -46,8 +46,8 @@ static public void initialize() throws Exception {
Configuration conf = null;
Properties tbl = new Properties();
// from google video API
tbl.setProperty(Constants.LIST_COLUMNS, "country,languages,religions");
tbl.setProperty(Constants.LIST_COLUMN_TYPES, "string,string,map<string,string>".toLowerCase());
tbl.setProperty(serdeConstants.LIST_COLUMNS, "country,languages,religions");
tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string,string,map<string,string>".toLowerCase());

instance.initialize(conf, tbl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ public void testSerializeWithMapping() throws SerDeException, JSONException {
Object obj = serde.serialize(row, soi);

assertTrue(obj instanceof Text);
assertEquals("{\"timestamp\":7898,\"two\":43.2,\"one\":true,\"three\":[],\"four\":\"value1\"}", obj.toString());
assertEquals("{\"four\":\"value1\",\"one\":true,\"two\":43.2,\"three\":[],\"timestamp\":7898}", obj.toString());

System.out.println("Output object " + obj.toString());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.openx.data.jsonserde;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.junit.Before;
import org.junit.Test;
import org.openx.data.jsonserde.json.JSONArray;
import org.openx.data.jsonserde.json.JSONObject;

import java.util.Properties;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Created by rcongiu on 8/30/15.
*/
public class JsonUnionTest {
static JsonSerDe instance;

@Before
public void setUp() throws Exception {
initialize();
}

static public void initialize() throws Exception {
instance = new JsonSerDe();
Configuration conf = null;
Properties tbl = new Properties();
// from google video API
tbl.setProperty(Constants.LIST_COLUMNS, "country,stuff");
tbl.setProperty(Constants.LIST_COLUMN_TYPES, "string,uniontype<int,double,array<string>,struct<a:int,b:string>,string>".toLowerCase());

instance.initialize(conf, tbl);
}

@Test
public void testDeSerialize() throws Exception {
// Test that timestamp object can be deserialized


StructObjectInspector soi = (StructObjectInspector) instance.getObjectInspector();

StructField sfr = soi.getStructFieldRef("stuff");

assertEquals(sfr.getFieldObjectInspector().getCategory(), ObjectInspector.Category.UNION);

UnionObjectInspector uoi = (UnionObjectInspector) sfr.getFieldObjectInspector();

// first, string
Writable w = new Text("{\"country\":\"Switzerland\",\"stuff\":\"Italian\"}");
JSONObject result = (JSONObject) instance.deserialize(w);
Object val = soi.getStructFieldData(result, sfr) ;
assertEquals("Italian", uoi.getField(val));

uoi.getTypeName();

// now, int
w = new Text("{\"country\":\"Switzerland\",\"stuff\":2}");
result = (JSONObject) instance.deserialize(w);
val = soi.getStructFieldData(result, sfr) ;
assertEquals("2", val);
assertEquals(0, uoi.getTag(val));

// now, struct
w = new Text("{\"country\":\"Switzerland\",\"stuff\": { \"a\": \"OK\" } }");
result = (JSONObject) instance.deserialize(w);
val = soi.getStructFieldData(result, sfr) ;
assertTrue(val instanceof JSONObject);
assertEquals(3, uoi.getTag(val));

// now, array
w = new Text("{\"country\":\"Switzerland\",\"stuff\": [ 1, 2 ] }");
result = (JSONObject) instance.deserialize(w);
val = soi.getStructFieldData(result, sfr) ;
assertTrue(val instanceof JSONArray);
assertEquals(2, uoi.getTag(val));
}
}
Loading

0 comments on commit 8add4d0

Please sign in to comment.