This repository was archived by the owner on Mar 30, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 92
/
Copy pathDruidQueryGranularity.scala
128 lines (106 loc) · 4.33 KB
/
DruidQueryGranularity.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sparklinedata.druid
import jodd.datetime.Period
import org.joda.time.chrono.ISOChronology
import org.joda.time.{DateTime, DateTimeZone, Interval}
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.{CustomSerializer, MappingException}
import scala.util.Try
import scala.language.postfixOps
sealed trait DruidQueryGranularity extends Serializable {
def ndv(ins : List[Interval]) : Long
}
object DruidQueryGranularity {
import Utils._
import org.json4s.jackson.sparklinedata.SmileJsonMethods._
@throws[MappingException]
def apply(s : String) : DruidQueryGranularity = s match {
case n if n.toUpperCase().equals("NONE") => NoneGranularity
case a if a.toUpperCase().equals("ALL") => AllGranularity
case s if s.toUpperCase().equals("SECOND") => DurationGranularity(1000L)
case m if m.toLowerCase().equals("MINUTE") => DurationGranularity(60 * 1000L)
case fm if fm.toLowerCase().equals("FIFTEEN_MINUTE") => DurationGranularity(15 * 60 * 1000L)
case tm if tm.toLowerCase().equals("THIRTY_MINUTE") => DurationGranularity(15 * 60 * 1000L)
case h if h.toLowerCase().equals("HOUR") => DurationGranularity(3600 * 1000L)
case d if d.toLowerCase().equals("DAY") => DurationGranularity(24 * 3600 * 1000L)
case _ => {
val jv = parse(s)
Try {
jv.extract[PeriodGranularity]
} recover {
case _ => jv.extract[DurationGranularity]
} get
}
}
}
object AllGranularity extends DruidQueryGranularity {
def ndv(ins : List[Interval]) = 1L
}
object NoneGranularity extends DruidQueryGranularity {
def ndv(ins : List[Interval]) = Utils.intervalsMillis(ins)
}
case class PeriodGranularity(
period: Period,
origin: DateTime,
tz: DateTimeZone) extends DruidQueryGranularity {
val chronology = if (tz == null) ISOChronology.getInstanceUTC else ISOChronology.getInstance(tz)
lazy val origMillis = if (origin == null) {
new DateTime(0, DateTimeZone.UTC).withZoneRetainFields(chronology.getZone).getMillis
}
else {
origin.getMillis
}
lazy val periodMillis = period.getMilliseconds
def ndv(ins : List[Interval]) = {
val boundedIns = ins.map { i =>
i.withStartMillis(Math.max(origMillis, i.getStartMillis)).
withEndMillis(Math.min(origMillis, i.getEndMillis))
}
Utils.intervalsMillis(boundedIns) / periodMillis
}
}
case class DurationGranularity(
duration: Long,
origin: DateTime = null) extends DruidQueryGranularity {
lazy val origMillis = if (origin == null) 0 else origin.getMillis
def ndv(ins : List[Interval]) = {
val boundedIns = ins.map { i =>
i.withStartMillis(Math.max(origMillis, i.getStartMillis)).
withEndMillis(Math.min(origMillis, i.getEndMillis))
}
Utils.intervalsMillis(boundedIns) / duration
}
}
class DruidQueryGranularitySerializer extends CustomSerializer[DruidQueryGranularity](format => {
implicit val f = format
(
{
case jO@JObject(JField("type", JString("duration")) :: rest) =>
jO.extract[DurationGranularity]
case jO@JObject(JField("type", JString("period")) :: rest) =>
jO.extract[PeriodGranularity]
case jO@JObject(JField("type", JString("all")) :: rest) => AllGranularity
case jO@JObject(JField("type", JString("none")) :: rest) => NoneGranularity
}
,
{
case x: DruidQueryGranularity =>
throw new RuntimeException("DruidQueryGranularity serialization not supported.")
}
)
})