diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8b34402 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +*.log +*.pot +*.pyc +*.swp + +.vagrant +.grunt + +# extra +.idea +.DS_Store +._* +*~ +*.egg-info +build +dist diff --git a/README.md b/README.md index a4f2ee0..2e49832 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,97 @@ Python implementation of zabbix_sender. This is a module that allows you to send data to a [Zabbix] server using Python. You don't need the zabbix_sender binary anymore. +Users of the [Twisted] library can use an asynchronous version of the sender. + Has been tested with Python 2.5.1 and 2.7 Python 2.5.1 doesn't have a json module (needed to implement zabbix protocol), so you can use [simplejson] instead. -Source code contains samples and comments to allows you start using it in no time. Here's a small example: +Installation +------------ + +Install the package using a pip from the original repository: +```bash +pip install "git+git://github.com/kmomberg/pyZabbixSender.git" +``` +or from one of the mirrors, like: +```bash +pip install "git+git://github.com/baseride/pyZabbixSender.git" +``` + +Usage +----- + +Here's a small example (synchronous code): + ```python +from pyZabbixSender.sy import syZabbixSender + +# Creating a sender object +z = syZabbixSender(server="zabbix-server", port=10051) + +# Adding data (without timestamp) +z.addData(hostname="test_host", key="test_trap_1", value="12") +z.addData("test_host", "test_trap_2", "2.43") + +# Adding data (with timestamp) +z.addData("test_host", "test_trap_2", "2.43", 1365787627) + +# Ready to send your data? +results = z.sendData() + +# Check if everything was sent as expected +if not results[0][0]: + print "oops! Sending data has been failed" +elif results[0][1]['info']['processed'] != 3: + print "oops! Zabbix doesn't recognize passed identities" + +# Clear internal data to start populating again +z.clearData() + +# Wants to send a single data point right now? +z.sendSingle("test_host","test_trap","12") +``` + +The asynchronous code looks mostly the same, except asynchronous calls to send...() functions: + +```python +from pyZabbixSender.tx import txZabbixSender +from twisted.internet import reactor, defer + +@defer.inlineCallbacks +def test(): + # Creating a sender object + z = txZabbixSender(server="zabbix-server", port=10051) + + # Adding data (without timestamp) + z.addData(hostname="test_host", key="test_trap_1", value="12") + z.addData("test_host", "test_trap_2", "2.43") + + # Adding data (with timestamp) + z.addData("test_host", "test_trap_2", "2.43", 1365787627) + + # Ready to send your data? + results = yield z.sendData() # NOTE an asynchronous call + + # Check if everything was sent as expected + if not results[0][0]: + print "oops! Sending data has been failed" + elif results[0][1]['info']['processed'] != 3: + print "oops! Zabbix doesn't recognize passed identities" + + # Clear internal data to start populating again + z.clearData() + + # Wants to send a single data point right now? + yield z.sendSingle("test_host","test_trap","12") # NOTE an asynchronous call +``` + +The backward-compatible code looks mostly the same, except return value processing: + +```python +from pyZabbixSender import pyZabbixSender + # Creating a sender object z = pyZabbixSender(server="zabbix-server", port=10051) @@ -35,8 +120,8 @@ z.clearData() z.sendSingle("test_host","test_trap","12") ``` -There are some more options, so take a look and discover how easy is to use it ;) +There are some more options, so take a look at the [wiki] page and discover how easy is to use it ;) License ---- @@ -46,3 +131,4 @@ GNU GPLv2 [Zabbix]:http://www.zabbix.com/ [simplejson]:https://simplejson.readthedocs.org/en/latest/ [wiki]:https://github.com/kmomberg/pyZabbixSender/wiki +[Twisted]:https://twistedmatrix.com diff --git a/pyZabbixSender/__init__.py b/pyZabbixSender/__init__.py new file mode 100644 index 0000000..02a7152 --- /dev/null +++ b/pyZabbixSender/__init__.py @@ -0,0 +1 @@ +from pyZabbixSender import pyZabbixSender diff --git a/pyZabbixSender.py b/pyZabbixSender/pyZabbixSender.py similarity index 66% rename from pyZabbixSender.py rename to pyZabbixSender/pyZabbixSender.py index 2588f2a..477483c 100644 --- a/pyZabbixSender.py +++ b/pyZabbixSender/pyZabbixSender.py @@ -11,19 +11,13 @@ import sys import re -# If you're using an old version of python that don't have json available, -# you can use simplejson instead: https://simplejson.readthedocs.org/en/latest/ -#import simplejson as json -import json +from pyZabbixSenderBase import * - -class pyZabbixSender: +class pyZabbixSender(pyZabbixSenderBase): ''' This class allows you to send data to a Zabbix server, using the same protocol used by the zabbix_server binary distributed by Zabbix. ''' - ZABBIX_SERVER = "127.0.0.1" - ZABBIX_PORT = 10051 # Return codes when sending data: RC_OK = 0 # Everything ok @@ -32,50 +26,6 @@ class pyZabbixSender: RC_ERR_CONN = 255 # Error talking to the server RC_ERR_INV_RESP = 254 # Invalid response from server - - def __init__(self, server=ZABBIX_SERVER, port=ZABBIX_PORT, verbose=False): - ''' - #####Description: - This is the constructor, to obtain an object of type pyZabbixSender, linked to work with a specific server/port. - - #####Parameters: - * **server**: [in] [string] [optional] This is the server domain name or IP. *Default value: "127.0.0.1"* - * **port**: [in] [integer] [optional] This is the port open in the server to receive zabbix traps. *Default value: 10051* - * **verbose**: [in] [boolean] [optional] This is to allow the library to write some output to stderr when finds an error. *Default value: False* - - **Note: The "verbose" parameter will be revisited and could be removed/replaced in the future** - - #####Return: - It returns a pyZabbixSender object. - ''' - self.zserver = server - self.zport = port - self.verbose = verbose - self.timeout = 5 # Socket connection timeout. - self.__data = [] # This is to store data to be sent later. - - - def __str__(self): - ''' - This allows you to obtain a string representation of the internal data - ''' - return str(self.__data) - - - def __createDataPoint(self, host, key, value, clock=None): - ''' - Creates a dictionary using provided parameters, as needed for sending this data. - ''' - obj = { - 'host': host, - 'key': key, - 'value': value, - } - if clock: - obj['clock'] = clock - return obj - - def __send(self, mydata): ''' This is the method that actually sends the data to the zabbix server. @@ -95,7 +45,7 @@ def __send(self, mydata): response_header = sock.recv(5) if not response_header == 'ZBXD\1': - err_message = u'Invalid response from server. Malformed data?\n---\n%s\n---\n' % str(mydata) + err_message = u'Invalid response from server [%s]. Malformed data?\n---\n%s\n---\n' % (repr(response_header),str(mydata)) sys.stderr.write(err_message) return self.RC_ERR_INV_RESP, err_message @@ -119,99 +69,6 @@ def __send(self, mydata): return self.RC_ERR_FAIL_SEND, response return self.RC_OK, response - - def addData(self, host, key, value, clock=None): - ''' - #####Description: - Adds host, key, value and optionally clock to the internal list of data to be sent later, when calling one of the methods to actually send the data to the server. - - #####Parameters: - * **host**: [in] [string] [mandatory] The host which the data is associated to. - * **key**: [in] [string] [mandatory] The name of the trap associated to the host in the Zabbix server. - * **value**: [in] [any] [mandatory] The value you want to send. Please note that you need to take care about the type, as it needs to match key definition in the Zabbix server. Numeric types can be specified as number (for example: 12) or text (for example: "12"). - * **clock**: [in] [integer] [optional] Here you can specify the Unix timestamp associated to your measurement. For example, you can process a log or a data file produced an hour ago, and you want to send the data with the timestamp when the data was produced, not when it was processed by you. If you don't specify this parameter, zabbix server will assign a timestamp when it receives the data. - - You can create a timestamp compatible with "clock" parameter using this code: - int(round(time.time())) - - *Default value: None* - - #####Return: - This method doesn't have a return. - ''' - obj = self.__createDataPoint(host, key, value, clock) - self.__data.append(obj) - - - def clearData(self): - ''' - #####Description: - This method removes all data from internal storage. You need to specify when it's done, as it's not automatically done after a data send operation. - - #####Parameters: - None - - #####Return: - None - ''' - self.__data = [] - - - def getData(self): - ''' - #####Description: - This method is used to obtain a copy of the internal data stored in the object. - - Please note you will **NOT** get the internal data object, but a copy of it, so no matter what you do with your copy, internal data will remain safe. - - #####Parameters: - None - - #####Return: - A copy of the internal data you added using the method *addData* (an array of dicts). - ''' - copy_of_data = [] - for data_point in self.__data: - copy_of_data.append(data_point.copy()) - return copy_of_data - - - def printData(self): - ''' - #####Description: - Print stored data (to stdout), so you can see what will be sent if "sendData" is called. This is useful for debugging purposes. - - #####Parameters: - None - - #####Return: - None - ''' - for elem in self.__data: - print str(elem) - print 'Count: %d' % len(self.__data) - - - def removeDataPoint(self, data_point): - ''' - #####Description: - This method delete one data point from the internal stored data. - - It's main purpose is to narrow the internal data to keep only those failed data points (those that were not received/processed by the server) so you can identify/retry them. Data points can be obtained from *sendDataOneByOne* return, or from *getData* return. - - #####Parameters: - * **data_point**: [in] [dict] [mandatory] This is a dictionary as returned by *sendDataOneByOne()* or *getData* methods. - - #####Return: - It returns True if data_point was found and deleted, and False if not. - ''' - if data_point in self.__data: - self.__data.remove(data_point) - return True - - return False - - def sendData(self, packet_clock=None, max_data_per_conn=None): ''' #####Description: @@ -226,24 +83,24 @@ def sendData(self, packet_clock=None, max_data_per_conn=None): int(round(time.time())) *Default value: None* - + * **max_data_per_conn**: [in] [integer] [optional] Allows the user to limit the number of data points sent in one single connection, as some times a too big number can produce problems over slow connections. Several "sends" will be automatically performed until all data is sent. If omitted, all data points will be sent in one single connection. *Default value: None* - + Please note that **internal data is not deleted after *sendData* is executed**. You need to call *clearData* after sending it, if you want to remove currently stored data. #####Return: A list of *(return_code, msg_from_server)* associated to each "send" operation. ''' - if not max_data_per_conn or max_data_per_conn > len(self.__data): - max_data_per_conn = len(self.__data) + if not max_data_per_conn or max_data_per_conn > len(self._data): + max_data_per_conn = len(self._data) responses = [] i = 0 - while i*max_data_per_conn < len(self.__data): + while i*max_data_per_conn < len(self._data): sender_data = { "request": "sender data", @@ -252,7 +109,7 @@ def sendData(self, packet_clock=None, max_data_per_conn=None): if packet_clock: sender_data['clock'] = packet_clock - sender_data['data'] = self.__data[i*max_data_per_conn:(i+1)*max_data_per_conn] + sender_data['data'] = self._data[i*max_data_per_conn:(i+1)*max_data_per_conn] to_send = json.dumps(sender_data) response = self.__send(to_send) @@ -280,7 +137,7 @@ def sendDataOneByOne(self): It returns an array of return codes (one for each individual "send") and the data sent: \[\[code\_1, data\_point\_1], \[code\_2, data\_point\_2\]\] ''' retarray = [] - for i in self.__data: + for i in self._data: if 'clock' in i: (retcode, retstring) = self.sendSingle(i['host'], i['key'], i['value'], i['clock']) else: @@ -304,7 +161,7 @@ def sendSingle(self, host, key, value, clock=None): You can create a timestamp compatible with "clock" parameter using this code: int(round(time.time())) - + *Default value: None* #####Return: @@ -315,7 +172,7 @@ def sendSingle(self, host, key, value, clock=None): "data": [], } - obj = self.__createDataPoint(host, key, value, clock) + obj = self._createDataPoint(host, key, value, clock) sender_data['data'].append(obj) to_send = json.dumps(sender_data) return self.__send(to_send) @@ -326,7 +183,7 @@ def sendSingleLikeProxy(self, host, key, value, clock=None, proxy=None): #####Description: Use this method to put the data for host monitored by proxy server. This method emulates proxy protocol and data will be accepted by Zabbix server even if they were send not actually from proxy. - + #####Parameters: * **host**: [in] [string] [mandatory] The host which the data is associated to. * **key**: [in] [string] [mandatory] The name of the trap associated to the host in the Zabbix server. @@ -335,29 +192,28 @@ def sendSingleLikeProxy(self, host, key, value, clock=None, proxy=None): You can create a timestamp compatible with "clock" parameter using this code: int(round(time.time())) - + *Default value: None* + * **proxy**: [in] [string] [optional] The name of the proxy to be recognized by the Zabbix server. If proxy is not specified, a normal "sendSingle" operation will be performed. *Default value: None* - #####Return: A list containing the return code and the message returned by the server. ''' # Proxy was not specified, so we'll do a "normal" sendSingle operation if proxy is None: return sendSingle(host, key, value, clock) - + sender_data = { "request": "history data", "host": proxy, "data": [], } - obj = self.__createDataPoint(host, key, value, clock) + obj = self._createDataPoint(host, key, value, clock) sender_data['data'].append(obj) to_send = json.dumps(sender_data) return self.__send(to_send) - ##################################### # --- Examples of usage --- ##################################### diff --git a/pyZabbixSender/pyZabbixSenderBase.py b/pyZabbixSender/pyZabbixSenderBase.py new file mode 100644 index 0000000..c0f61a0 --- /dev/null +++ b/pyZabbixSender/pyZabbixSenderBase.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 +# Copyright 2015 Kurt Momberg +# > Based on work by Klimenko Artyem +# >> Based on work by Rob Cherry +# >>> Based on work by Enrico Tröger +# License: GNU GPLv2 + +import struct +import time +import sys +import re + +# If you're using an old version of python that don't have json available, +# you can use simplejson instead: https://simplejson.readthedocs.org/en/latest/ +try: + import json +except ImportError: + import simplejson as json + +class InvalidResponse(Exception): + pass + +class pyZabbixSenderBase: + ''' + This class creates network-agnostic data structures to send data to a Zabbix server + ''' + ZABBIX_SERVER = "127.0.0.1" + ZABBIX_PORT = 10051 + + def __init__(self, server=ZABBIX_SERVER, port=ZABBIX_PORT, verbose=False): + ''' + #####Description: + This is the constructor, to obtain an object of type pyZabbixSender, linked to work with a specific server/port. + + #####Parameters: + * **server**: [in] [string] [optional] This is the server domain name or IP. *Default value: "127.0.0.1"* + * **port**: [in] [integer] [optional] This is the port open in the server to receive zabbix traps. *Default value: 10051* + * **verbose**: [in] [boolean] [optional] This is to allow the library to write some output to stderr when finds an error. *Default value: False* + + **Note: The "verbose" parameter will be revisited and could be removed/replaced in the future** + + #####Return: + It returns a pyZabbixSender object. + ''' + self.zserver = server + self.zport = port + self.verbose = verbose + self.timeout = 5 # Socket connection timeout. + self._data = [] # This is to store data to be sent later. + + + def __str__(self): + ''' + This allows you to obtain a string representation of the internal data + ''' + return str(self._data) + + + def _createDataPoint(self, host, key, value, clock=None): + ''' + Creates a dictionary using provided parameters, as needed for sending this data. + ''' + obj = { + 'host': host, + 'key': key, + 'value': value, + } + if clock: + obj['clock'] = clock + return obj + + def addData(self, host, key, value, clock=None): + ''' + #####Description: + Adds host, key, value and optionally clock to the internal list of data to be sent later, when calling one of the methods to actually send the data to the server. + + #####Parameters: + * **host**: [in] [string] [mandatory] The host which the data is associated to. + * **key**: [in] [string] [mandatory] The name of the trap associated to the host in the Zabbix server. + * **value**: [in] [any] [mandatory] The value you want to send. Please note that you need to take care about the type, as it needs to match key definition in the Zabbix server. Numeric types can be specified as number (for example: 12) or text (for example: "12"). + * **clock**: [in] [integer] [optional] Here you can specify the Unix timestamp associated to your measurement. For example, you can process a log or a data file produced an hour ago, and you want to send the data with the timestamp when the data was produced, not when it was processed by you. If you don't specify this parameter, zabbix server will assign a timestamp when it receives the data. + + You can create a timestamp compatible with "clock" parameter using this code: + int(round(time.time())) + + *Default value: None* + + #####Return: + This method doesn't have a return. + ''' + obj = self._createDataPoint(host, key, value, clock) + self._data.append(obj) + + + def clearData(self): + ''' + #####Description: + This method removes all data from internal storage. You need to specify when it's done, as it's not automatically done after a data send operation. + + #####Parameters: + None + + #####Return: + None + ''' + self._data = [] + + + def getData(self): + ''' + #####Description: + This method is used to obtain a copy of the internal data stored in the object. + + Please note you will **NOT** get the internal data object, but a copy of it, so no matter what you do with your copy, internal data will remain safe. + + #####Parameters: + None + + #####Return: + A copy of the internal data you added using the method *addData* (an array of dicts). + ''' + copy_of_data = [] + for data_point in self._data: + copy_of_data.append(data_point.copy()) + return copy_of_data + + + def printData(self): + ''' + #####Description: + Print stored data (to stdout), so you can see what will be sent if "sendData" is called. This is useful for debugging purposes. + + #####Parameters: + None + + #####Return: + None + ''' + for elem in self._data: + print str(elem) + print 'Count: %d' % len(self._data) + + + def removeDataPoint(self, data_point): + ''' + #####Description: + This method delete one data point from the internal stored data. + + It's main purpose is to narrow the internal data to keep only those failed data points (those that were not received/processed by the server) so you can identify/retry them. Data points can be obtained from *sendDataOneByOne* return, or from *getData* return. + + #####Parameters: + * **data_point**: [in] [dict] [mandatory] This is a dictionary as returned by *sendDataOneByOne()* or *getData* methods. + + #####Return: + It returns True if data_point was found and deleted, and False if not. + ''' + if data_point in self._data: + self._data.remove(data_point) + return True + + return False + +def recognize_response_raw(response_raw): + return recognize_response(json.loads(response_raw)) + +FAILED_COUNTER = re.compile('^.*failed.+?(\d+).*$') +PROCESSED_COUNTER = re.compile('^.*processed.+?(\d+).*$') +SECONDS_SPENT = re.compile('^.*seconds spent.+?((-|\+|\d|\.|e|E)+).*$') + +def recognize_response(response): + failed = FAILED_COUNTER.match(response['info'].lower() if 'info' in response else '') + processed = PROCESSED_COUNTER.match(response['info'].lower() if 'info' in response else '') + seconds_spent = SECONDS_SPENT.match(response['info'].lower() if 'info' in response else '') + + if failed is None or processed is None: + raise InvalidResponse('Unable to parse server response',packet,response_raw) + failed = int(failed.group(1)) + processed = int(processed.group(1)) + seconds_spent = float(seconds_spent.group(1)) if seconds_spent else None + response['info'] = { + 'failed':failed, + 'processed':processed, + 'seconds spent':seconds_spent + } + return response diff --git a/pyZabbixSender/sy.py b/pyZabbixSender/sy.py new file mode 100644 index 0000000..6af0f26 --- /dev/null +++ b/pyZabbixSender/sy.py @@ -0,0 +1,191 @@ +# -*- coding: utf-8 +# Copyleft 2016 Vsevolod Novikov +# > Based on work by Kurt Momberg +# >> Based on work by Klimenko Artyem +# >>> Based on work by Rob Cherry +# >>>> Based on work by Enrico Tröger +# License: GNU GPLv2 + +import socket +import struct +import time +import sys +import re + +from pyZabbixSenderBase import * + +class syZabbixSender(pyZabbixSenderBase): + ''' + This class allows you to send data to a Zabbix server, using the same + protocol used by the zabbix_server binary distributed by Zabbix. + + It uses exceptions to report errors. + ''' + + def send_packet(self, packet): + ''' + This is the method that actually sends the data to the zabbix server. + ''' + mydata = json.dumps(packet) + socket.setdefaulttimeout(self.timeout) + data_length = len(mydata) + data_header = str(struct.pack('q', data_length)) + data_to_send = 'ZBXD\1' + str(data_header) + str(mydata) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.zserver, self.zport)) + sock.send(data_to_send) + + response_header = sock.recv(5) + if not response_header == 'ZBXD\1': + raise InvalidResponse('Wrong magic: %s' % response_header) + + response_data_header = sock.recv(8) + response_data_header = response_data_header[:4] + response_len = struct.unpack('i', response_data_header)[0] + response_raw = sock.recv(response_len) + sock.close() + return recognize_response_raw(response_raw) + + def sendData(self, packet_clock=None, max_data_per_conn=None): + ''' + #####Description: + Sends data stored using *addData* method, to the Zabbix server. + + #####Parameters: + * **packet_clock**: [in] [integer] [optional] Zabbix server uses the "clock" parameter in the packet to associate that timestamp to all data values not containing their own clock timestamp. Then: + * If packet_clock is specified, zabbix server will associate it to all data values not containing their own clock. + * If packet_clock is **NOT** specified, zabbix server will use the time when it received the packet as packet clock. + + You can create a timestamp compatible with "clock" or "packet_clock" parameters using this code: + + int(round(time.time())) + *Default value: None* + + * **max_data_per_conn**: [in] [integer] [optional] Allows the user to limit the number of data points sent in one single connection, as some times a too big number can produce problems over slow connections. + + Several "sends" will be automatically performed until all data is sent. + + If omitted, all data points will be sent in one single connection. *Default value: None* + + Please note that **internal data is not deleted after *sendData* is executed**. You need to call *clearData* after sending it, if you want to remove currently stored data. + + #####Return: + A list of *(result, msg)* associated to each "send" operation, where *result* is a boolean meaning success of the operation, + and *msg* is a message from the server in case of success, or exception in case of error. + + In case of success, the server returns a message which is parsed by the function. The server message + contains counters for *processed* and *failed* (ignored) data items. Note that even if processed + data counter is 0 and all data items have been failed, it does not mean the error condition. + ''' + if not max_data_per_conn or max_data_per_conn > len(self._data): + max_data_per_conn = len(self._data) + + responses = [] + i = 0 + while i*max_data_per_conn < len(self._data): + + sender_data = { + "request": "sender data", + "data": [], + } + if packet_clock: + sender_data['clock'] = packet_clock + + sender_data['data'] = self._data[i*max_data_per_conn:(i+1)*max_data_per_conn] + try: + response = self.send_packet(sender_data) + except Exception,ex: + responses.append((False,ex)) + else: + responses.append((True,response)) + i += 1 + + return responses + + def sendDataOneByOne(self): + ''' + #####Description: + You can use this method to send all stored data, one by one, to determine which traps are not being handled correctly by the server. + + Using this method you'll be able to detect things like: + * hosts not defined in the server + * traps not defined in some particular host + + This is primarily intended for debugging purposes. + + #####Parameters: + None + + #####Return: + A list of *(result, msg)* associated to each "send" operation, where *result* is a boolean meaning success of the operation, + and *msg* is a message from the server in case of success, or exception in case of error. + + In case of success, the server returns a message which is parsed by the function. The server message + contains counters for *processed* and *failed* (ignored) data items. Note that even if processed + data counter is 0 and all data items have been failed, it does not mean the error condition. + ''' + return self.sendData(max_data_per_conn=1) + + def sendSingle(self, host, key, value, clock=None): + ''' + #####Description: + Instead of storing data for sending later, you can use this method to send specific values right now. + + #####Parameters: + It shares the same parameters as the *addData* method. + * **host**: [in] [string] [mandatory] The host which the data is associated to. + * **key**: [in] [string] [mandatory] The name of the trap associated to the host in the Zabbix server. + * **value**: [in] [any] [mandatory] The value you want to send. Please note that you need to take care about the type, as it needs to match key definition in the Zabbix server. Numeric types can be specified as number (for example: 12) or text (for example: "12"). + * **clock**: [in] [integer] [optional] Here you can specify the Unix timestamp associated to your measurement. For example, you can process a log or a data file produced an hour ago, and you want to send the data with the timestamp when the data was produced, not when it was processed by you. If you don't specify this parameter, zabbix server will assign a timestamp when it receives the data. + + You can create a timestamp compatible with "clock" parameter using this code: + int(round(time.time())) + + *Default value: None* + + #####Return: + A message returned by the server. + ''' + sender_data = { + "request": "sender data", + "data": [], + } + + obj = self._createDataPoint(host, key, value, clock) + sender_data['data'].append(obj) + return self.send_packet(sender_data) + + def sendSingleLikeProxy(self, host, key, value, clock=None, proxy=None): + ''' + #####Description: + Use this method to put the data for host monitored by proxy server. This method emulates proxy protocol and data will be accepted by Zabbix server + even if they were send not actually from proxy. + + #####Parameters: + * **host**: [in] [string] [mandatory] The host which the data is associated to. + * **key**: [in] [string] [mandatory] The name of the trap associated to the host in the Zabbix server. + * **value**: [in] [any] [mandatory] The value you want to send. Please note that you need to take care about the type, as it needs to match key definition in the Zabbix server. Numeric types can be specified as number (for example: 12) or text (for example: "12"). + * **clock**: [in] [integer] [optional] Here you can specify the Unix timestamp associated to your measurement. For example, you can process a log or a data file produced an hour ago, and you want to send the data with the timestamp when the data was produced, not when it was processed by you. If you don't specify this parameter, zabbix server will assign a timestamp when it receives the data. + + You can create a timestamp compatible with "clock" parameter using this code: + int(round(time.time())) + + *Default value: None* + + * **proxy**: [in] [string] [optional] The name of the proxy to be recognized by the Zabbix server. If proxy is not specified, a normal "sendSingle" operation will be performed. *Default value: None* + #####Return: + A message returned by the server. + ''' + # Proxy was not specified, so we'll do a "normal" sendSingle operation + if proxy is None: + return sendSingle(host, key, value, clock) + + sender_data = { + "request": "history data", + "host": proxy, + "data": [], + } + + obj = self._createDataPoint(host, key, value, clock) + sender_data['data'].append(obj) + return self.send_packet(sender_data) diff --git a/pyZabbixSender/tx.py b/pyZabbixSender/tx.py new file mode 100644 index 0000000..81ce5ac --- /dev/null +++ b/pyZabbixSender/tx.py @@ -0,0 +1,312 @@ +# -*- coding: utf-8 +# Copyleft 2016 Vsevolod Novikov +# > Based on work by Kurt Momberg +# >> Based on work by Klimenko Artyem +# >>> Based on work by Rob Cherry +# >>>> Based on work by Enrico Tröger +# License: GNU GPLv2 + +from twisted.internet.endpoints import TCP4ServerEndpoint +from twisted.internet import reactor,address,defer + +from twisted.python import log, failure + +from twisted.internet import protocol, reactor +from zope.interface import implements +from twisted.internet import interfaces,error + +import struct +import time +import sys +import re + +from pyZabbixSenderBase import * + +class SenderProtocol(protocol.Protocol): + def __init__(self,factory): + self.factory = factory + self.reset() + + def reset(self): + self.tail = '' + self.state = 'magic' + + def dataReceived(self, data): + log.msg("RECEIVED DATA: %s" % len(data)) + while len(data): + l = self.parseData(data) + log.msg("PARSED DATA: %s" % l) + if l: + data = data[l:] + else: + self.error_happens(failure.Failure(InvalidResponse("Unknown data chunk"))) + self.transport.loseConnection() + break + + def parseData(self,data): + d = self.tail + data + l = self._expected_length() + if len(d) < l: + self.tail = d + return len(data) + + self._expected_parse(d[0:l]) + taill = len(self.tail) + self.tail = '' + return l - taill + + def _expected_length(self): + m = getattr(self,'_expected_length_'+self.state) + return m() + + def _expected_parse(self,data): + m = getattr(self,'_expected_parse_'+self.state) + return m(data) + + def _expected_length_magic(self): + return 5 + + def _expected_parse_magic(self,data): + if not data == 'ZBXD\1': + self.error_happens(failure.Failure(InvalidResponse("Wrong magic: %s" % data))) + self.transport.loseConnection() + return + self.state = 'header' + + def _expected_length_header(self): + return 8 + + def _expected_parse_header(self,data): + self._data_length, = struct.unpack('i',data[:4]) + log.msg("Received length: %s" % self._data_length) + self.state = 'data' + + def _expected_length_data(self): + return self._data_length + + def _expected_parse_data(self,data): + packet = {} + self.state = 'header' + try: + packet = json.loads(data) + except Exception,ex: + f = failure.Failure() + self.error_happens(f) + self.transport.loseConnection() + return + log.msg("Received packet: %s" % packet) + try: + self.packet_received(packet) + except Exception,ex: + f = failure.Failure() + self.error_happens(f) + self.transport.loseConnection() + return + self.state = 'done' + self.transport.loseConnection() # Normally the Zabbix expects closing connection from the sender + + def packet_received(self,packet): + raise NotImplemented() + + def error_happens(self,fail): + log.err(fail) + + def send_packet(self,packet): + '''sends a packet in form of json''' + log.msg("Sending a packet: %s" % packet) + try: + data = json.dumps(packet) + except Exception,ex: + f = failure.Failure() + self.error_happens(f) + self.transport.loseConnection() + return + data_length = len(data) + data_header = str(struct.pack('q', data_length)) + data_to_send = 'ZBXD\1' + str(data_header) + data + self.transport.write(data_to_send) + log.msg("Packet sent: %s bytes" % len(data_to_send)) + +class SenderProcessor(SenderProtocol): + def __init__(self,factory,packet,deferred): + SenderProtocol.__init__(self,factory) + self.deferred = deferred + self.packet = packet + + def connectionMade(self): + self.send_packet(self.packet) + def packet_received(self,packet): + response = recognize_response(packet) + self.deferred.callback(response) + def error_happens(self,fail): + self.deferred.errback(fail) + +class SenderFactory(protocol.ClientFactory): + def __init__(self,packet,deferred): + self.deferred = deferred + self.packet = packet + def buildProtocol(self,addr): + return SenderProcessor(self,self.packet,self.deferred) + + def clientConnectionFailed(self, connector, reason): + if not self.deferred.called: + log.err("ERROR: connecting has been failed because of:%s, sending data has been skipped" % reason) + self.deferred.errback(reason) + + def clientConnectionLost(self, connector, reason): + if not isinstance(reason.value,error.ConnectionDone): + if not self.deferred.called: + log.err("ERROR: connecting has been lost because of:%s, sending data has been skipped" % reason) + self.deferred.errback(reason) + +class txZabbixSender(pyZabbixSenderBase): + ''' + This class allows you to send data to a Zabbix server asynchronously, using the same + protocol used by the zabbix_server binary distributed by Zabbix. + ''' + + def _send(self,packet): + '''This method creates a connection, sends data and returns deferred to get a result''' + deferred = defer.Deferred() + connection = reactor.connectTCP(self.zserver,self.zport,SenderFactory(packet,deferred),self.timeout) + return deferred + + def sendData(self, packet_clock=None, max_data_per_conn=None): + ''' + #####Description: + Sends data stored using *addData* method, to the Zabbix server. + + #####Parameters: + * **packet_clock**: [in] [integer] [optional] Zabbix server uses the "clock" parameter in the packet to associate that timestamp to all data values not containing their own clock timestamp. Then: + * If packet_clock is specified, zabbix server will associate it to all data values not containing their own clock. + * If packet_clock is **NOT** specified, zabbix server will use the time when it received the packet as packet clock. + + You can create a timestamp compatible with "clock" or "packet_clock" parameters using this code: + + int(round(time.time())) + *Default value: None* + + * **max_data_per_conn**: [in] [integer] [optional] Allows the user to limit the number of data points sent in one single connection, as some times a too big number can produce problems over slow connections. + + Several "sends" will be automatically performed until all data is sent. + + If omitted, all data points will be sent in one single connection. *Default value: None* + + Please note that **internal data is not deleted after *sendData* is executed**. You need to call *clearData* after sending it, if you want to remove currently stored data. + + #####Return: + A deferred list of each "send" operation results. + ''' + if not max_data_per_conn or max_data_per_conn > len(self._data): + max_data_per_conn = len(self._data) + + responses = [] + i = 0 + while i*max_data_per_conn < len(self._data): + + sender_data = { + "request": "sender data", + "data": [], + } + if packet_clock: + sender_data['clock'] = packet_clock + + sender_data['data'] = self._data[i*max_data_per_conn:(i+1)*max_data_per_conn] + response = self._send(sender_data) + responses.append(response) + i += 1 + + return defer.DeferredList(responses) + + def sendDataOneByOne(self): + ''' + #####Description: + You can use this method to send all stored data, one by one, to determine which traps are not being handled correctly by the server. + + Using this method you'll be able to detect things like: + * hosts not defined in the server + * traps not defined in some particular host + + This is primarily intended for debugging purposes. + + #####Parameters: + None + + #####Return: + A deferred list of each "send" operation results. + ''' + retarray = [] + for i in self._data: + if 'clock' in i: + d = self.sendSingle(i['host'], i['key'], i['value'], i['clock']) + else: + d = self.sendSingle(i['host'], i['key'], i['value']) + + retarray.append(d) + return defer.DeferredList(retarray) + + + def sendSingle(self, host, key, value, clock=None): + ''' + #####Description: + Instead of storing data for sending later, you can use this method to send specific values right now. + + #####Parameters: + It shares the same parameters as the *addData* method. + * **host**: [in] [string] [mandatory] The host which the data is associated to. + * **key**: [in] [string] [mandatory] The name of the trap associated to the host in the Zabbix server. + * **value**: [in] [any] [mandatory] The value you want to send. Please note that you need to take care about the type, as it needs to match key definition in the Zabbix server. Numeric types can be specified as number (for example: 12) or text (for example: "12"). + * **clock**: [in] [integer] [optional] Here you can specify the Unix timestamp associated to your measurement. For example, you can process a log or a data file produced an hour ago, and you want to send the data with the timestamp when the data was produced, not when it was processed by you. If you don't specify this parameter, zabbix server will assign a timestamp when it receives the data. + + You can create a timestamp compatible with "clock" parameter using this code: + int(round(time.time())) + + *Default value: None* + + #####Return: + A deferred for the operation results. + ''' + sender_data = { + "request": "sender data", + "data": [], + } + + obj = self._createDataPoint(host, key, value, clock) + sender_data['data'].append(obj) + return self._send(sender_data) + + + def sendSingleLikeProxy(self, host, key, value, clock=None, proxy=None): + ''' + #####Description: + Use this method to put the data for host monitored by proxy server. This method emulates proxy protocol and data will be accepted by Zabbix server + even if they were send not actually from proxy. + + #####Parameters: + * **host**: [in] [string] [mandatory] The host which the data is associated to. + * **key**: [in] [string] [mandatory] The name of the trap associated to the host in the Zabbix server. + * **value**: [in] [any] [mandatory] The value you want to send. Please note that you need to take care about the type, as it needs to match key definition in the Zabbix server. Numeric types can be specified as number (for example: 12) or text (for example: "12"). + * **clock**: [in] [integer] [optional] Here you can specify the Unix timestamp associated to your measurement. For example, you can process a log or a data file produced an hour ago, and you want to send the data with the timestamp when the data was produced, not when it was processed by you. If you don't specify this parameter, zabbix server will assign a timestamp when it receives the data. + + You can create a timestamp compatible with "clock" parameter using this code: + int(round(time.time())) + + *Default value: None* + + * **proxy**: [in] [string] [optional] The name of the proxy to be recognized by the Zabbix server. If proxy is not specified, a normal "sendSingle" operation will be performed. *Default value: None* + #####Return: + A deferred for the operation results. + ''' + # Proxy was not specified, so we'll do a "normal" sendSingle operation + if proxy is None: + return sendSingle(host, key, value, clock) + + sender_data = { + "request": "history data", + "host": proxy, + "data": [], + } + + obj = self._createDataPoint(host, key, value, clock) + sender_data['data'].append(obj) + return self._send(sender_data) diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..93a774b --- /dev/null +++ b/setup.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +import os.path +from setuptools import setup, find_packages + +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() + +setupconf = dict( + name="pyZabbixSender", + author="Kurt Momberg", + author_email="kurtqm@yahoo.com.ar", + description="Python implementation of zabbix_sender.", + long_description = read('README.md'), + url="https://github.com/kmomberg/pyZabbixSender", + version="0.2", + license = "GNU GPL v2", + packages = find_packages(), +# install_requires = [???], + classifiers = [ + "Operating System :: OS Independent", + "Development Status :: 4 - Beta", + "Environment :: Library", + "Framework :: Zabbix", + "Intended Audience :: Developers", + "Programming Language :: Python", + "Programming Language :: Python :: 2.5.1", + "Programming Language :: Python :: 2.7", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: System :: Networking", + "Topic :: System :: Monitoring", + ], + zip_safe=False, +) + +if __name__ == '__main__': + setup(**setupconf)