Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace fix_datapoints with fill(null) in influx query #14

Closed
Dieterbe opened this issue Jul 30, 2014 · 2 comments
Closed

replace fix_datapoints with fill(null) in influx query #14

Dieterbe opened this issue Jul 30, 2014 · 2 comments

Comments

@Dieterbe
Copy link
Contributor

once influxdata/influxdb#426 is fixed,
we can replace all the fix_datapoints logic and just query like

select time, mean(value) from foo group by time(60s) fill(null) where time > 1406230368s and time < 1406231207s

this will simplify the code and should be faster too.

@Dieterbe
Copy link
Contributor Author

unfortunately, this makes things slower.

in the patch below, we get rid of fix_datapoints and fix_datapoints_multi and instead
use "group by time (60s) fill(null)"
the difference in timings is further down.

diff --git a/graphite_influxdb.py b/graphite_influxdb.py
index a71da7d..b4ccad0 100644
--- a/graphite_influxdb.py
+++ b/graphite_influxdb.py
@@ -94,96 +94,21 @@ class InfluxdbReader(object):
         # influx doesn't support <= and >= yet, hence the add.
         logger.debug(caller="fetch()", start_time=start_time, end_time=end_time, step=self.step, debug_key=self.path)
         with statsd.timer('service=graphite-api.ext_service=influxdb.target_type=gauge.unit=ms.what=query_individual_duration'):
-            data = self.client.query('select time, value from "%s" where time > %ds '
-                                     'and time < %ds order asc' % (
-                                         self.path, start_time, end_time + 1))
+            data = self.client.query('select time, mean(value) from "%s" group by time(%ds) fill(null) '
+                                     'where time > %ds and time < %ds order asc' % (
+                                         self.path, self.step, start_time, end_time + 1))

         logger.debug(caller="fetch()", returned_data=data, debug_key=self.path)

         try:
-            known_points = data[0]['points']
+            datapoints = data[0]['points']
         except Exception:
             logger.debug(caller="fetch()", msg="COULDN'T READ POINTS. SETTING TO EMPTY LIST", debug_key=self.path)
-            known_points = []
-        logger.debug(caller="fetch()", msg="invoking fix_datapoints()", debug_key=self.path)
-        datapoints = InfluxdbReader.fix_datapoints(known_points, start_time, end_time, self.step, self.path)
+            datapoints = []

         time_info = start_time, end_time, self.step
         return time_info, datapoints

-    @staticmethod
-    def fix_datapoints_multi(data, start_time, end_time, step):
-        out = {}
-        """
-        data looks like:
-        [{u'columns': [u'time', u'sequence_number', u'value'],
-          u'name': u'stats.timers.dfvimeoplayproxy3.varnish.miss.410.count_ps',
-            u'points': [[1402928319, 1, 0.133333],
-            ....
-        """
-        for seriesdata in data:
-            logger.debug(caller="fix_datapoints_multi", msg="invoking fix_datapoints()", debug_key=seriesdata['name'])
-            datapoints = InfluxdbReader.fix_datapoints(seriesdata['points'], start_time, end_time, step, seriesdata['name'])
-            out[seriesdata['name']] = datapoints
-        return out
-
-    @staticmethod
-    def fix_datapoints(known_points, start_time, end_time, step, debug_key):
-        """
-        points is a list of known points (potentially empty)
-        """
-        logger.debug(caller='fix_datapoints', len_known_points=len(known_points), debug_key=debug_key)
-        if len(known_points) == 1:
-            logger.debug(caller='fix_datapoints', only_known_point=known_points[0], debug_key=debug_key)
-        elif len(known_points) > 1:
-            logger.debug(caller='fix_datapoints', first_known_point=known_points[0], debug_key=debug_key)
-            logger.debug(caller='fix_datapoints', last_known_point=known_points[-1], debug_key=debug_key)
-
-        datapoints = []
-        steps = int(round((end_time - start_time) * 1.0 / step))
-        # if we have 3 datapoints: at 0, at 60 and 120, then step is 60, steps = 2 and should have 3 points
-        # note that graphite assumes data at quantized intervals, whereas in influx they can be stored at like 07, 67, etc.
-        ratio = len(known_points) * 1.0 / (steps + 1)
-        statsd.timer('service=graphite-api.target_type=gauge.unit=none.what=known_points/needed_points', ratio)
-
-        if len(known_points) == steps + 1:
-            logger.debug(action="No steps missing", debug_key=debug_key)
-            datapoints = [p[2] for p in known_points]
-        else:
-            amount = steps + 1 - len(known_points)
-            logger.debug(action="Fill missing steps with None values", amount=amount, debug_key=debug_key)
-            next_point = 0
-            for s in range(0, steps + 1):
-                # if we have no more known points, fill with None's
-                # even ininitially when next_point = 0, len(known_points) might be == 0
-                if next_point >= len(known_points):
-                    datapoints.append(None)
-                    continue
-
-                # if points are not evenly spaced. i.e. they should be a minute apart but sometimes they are 55 or 65 seconds,
-                # and if they are all about step/2 away from the target timestamps, then sometimes a target point has 2 candidates, and
-                # sometimes 0. So a point might be more than step/2 older.  in that case, since points are sorted, we can just forward the pointer
-                # influxdb's fill(null) will make this cleaner and stop us from having to worry about this.
-
-                should_be_near = start_time + step * s
-                diff = known_points[next_point][0] - should_be_near
-                while next_point + 1 < len(known_points) and diff < (step / 2) * -1:
-                    next_point += 1
-                    diff = known_points[next_point][0] - should_be_near
-
-                # use this point if it's within step/2 from our target
-                if abs(diff) <= step / 2:
-                    datapoints.append(known_points[next_point][2])
-                    next_point += 1  # note: might go out of bounds, which we use as signal
-
-                else:
-                    datapoints.append(None)
-
-        logger.debug(caller='fix_datapoints', len_known_points=len(known_points), len_datapoints=len(datapoints), debug_key=debug_key)
-        logger.debug(caller='fix_datapoints', first_returned_point=datapoints[0], debug_key=debug_key)
-        logger.debug(caller='fix_datapoints', last_returned_point=datapoints[-1], debug_key=debug_key)
-        return datapoints
-
     def get_intervals(self):
             now = int(time.time())
             return IntervalSet([Interval(1, now)])
@@ -263,7 +188,7 @@ class InfluxdbFinder(object):
             for name in series:
                 if regex.match(name) is not None:
                     logger.debug("found leaf", name=name)
-                    res = 10
+                    res = 60
                     for (rule_patt, rule_res) in self.schemas:
                         if rule_patt.match(name):
                             res = rule_res
@@ -307,21 +232,19 @@ class InfluxdbFinder(object):
     def fetch_multi(self, nodes, start_time, end_time):
         series = ', '.join(['"%s"' % node.path for node in nodes])
         step = 60  # TODO: this is not ideal in all cases. for one thing, don't hardcode, for another.. how to deal with multiple steps?
-        query = 'select time, value from %s where time > %ds and time < %ds order asc' % (
-                series, start_time, end_time + 1)
+        query = 'select time, mean(value) from %s group by time (%ds) fill(null) where time > %ds and time < %ds order asc' % (
+                series, step, start_time, end_time + 1)
         logger.debug(caller='fetch_multi', query=query)
         logger.debug(caller='fetch_multi', start_time=print_time(start_time), end_time=print_time(end_time), step=step)
         with statsd.timer('service=graphite-api.ext_service=influxdb.target_type=gauge.unit=ms.action=select_datapoints'):
             data = self.client.query(query)
         logger.debug(caller='fetch_multi', returned_data=data)
+        datapoints = {}
+        for seriesdata in data:
+            datapoints[seriesdata['name']] = [p[1] for p in seriesdata['points']]
         if not len(data):
-            data = [{'name': node.path, 'points': []} for node in nodes]
-            logger.debug(caller='fetch_multi', FIXING_DATA_TO=data)
-        logger.debug(caller='fetch_multi', len_datapoints_before_fixing=len(data))
-
-        with statsd.timer('service=graphite-api.action=fix_datapoints_multi.target_type=gauge.unit=ms'):
-            logger.debug(caller='fetch_multi', action='invoking fix_datapoints_multi()')
-            datapoints = InfluxdbReader.fix_datapoints_multi(data, start_time, end_time, step)
+            for node in nodes:
+                datapoints[node.path] = []

         time_info = start_time, end_time, step
         return time_info, datapoints

the change is deployed around 6:14
this is for requesting a bunch of graphs that go 6hours back and with minutely resolution.
especially the worst case select_datapoints (e.g. query influx) takes a hit of several seconds, whereas the worst case fix_datapoints was still <250ms.
the 2nd graph is a zoom in of the first one.

timings
timings-zoomed

also, for one graph (note to self: Requests on health page) the data was incorrect (rendered as square with peak @ 1)

@Dieterbe
Copy link
Contributor Author

maybe @pauldix and friends can make group by time (%ds) fill (null) faster at some point,
but for now we better stick with the python version :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant