1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
4 #
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
6 #
7 # Distributed under terms of the GPLv2 license or newer.
8 #
9 # Frank Marien (frank@apsu.be) 6 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated to python 3
12 # - fixed for larger packet sizes (possible on lo interface)
13 # - fixed comment typo (decode_network_string decodes a string)
15 """
16 Collectd network protocol implementation.
17 """
19 import socket,struct,sys
20 import platform
21 if platform.python_version() < '2.8.0':
22 # Python 2.7 and below io.StringIO does not like unicode
23 from StringIO import StringIO
24 else:
25 try:
26 from io import StringIO
27 except ImportError:
28 from cStringIO import StringIO
30 from datetime import datetime
31 from copy import deepcopy
34 DEFAULT_PORT = 25826
35 """Default port"""
37 DEFAULT_IPv4_GROUP = "239.192.74.66"
38 """Default IPv4 multicast group"""
40 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
41 """Default IPv6 multicast group"""
43 HR_TIME_DIV = (2.0**30)
45 # Message kinds
46 TYPE_HOST = 0x0000
47 TYPE_TIME = 0x0001
48 TYPE_TIME_HR = 0x0008
49 TYPE_PLUGIN = 0x0002
50 TYPE_PLUGIN_INSTANCE = 0x0003
51 TYPE_TYPE = 0x0004
52 TYPE_TYPE_INSTANCE = 0x0005
53 TYPE_VALUES = 0x0006
54 TYPE_INTERVAL = 0x0007
55 TYPE_INTERVAL_HR = 0x0009
57 # For notifications
58 TYPE_MESSAGE = 0x0100
59 TYPE_SEVERITY = 0x0101
61 # DS kinds
62 DS_TYPE_COUNTER = 0
63 DS_TYPE_GAUGE = 1
64 DS_TYPE_DERIVE = 2
65 DS_TYPE_ABSOLUTE = 3
67 header = struct.Struct("!2H")
68 number = struct.Struct("!Q")
69 short = struct.Struct("!H")
70 double = struct.Struct("<d")
72 def decode_network_values(ptype, plen, buf):
73 """Decodes a list of DS values in collectd network format
74 """
75 nvalues = short.unpack_from(buf, header.size)[0]
76 off = header.size + short.size + nvalues
77 valskip = double.size
79 # Check whether our expected packet size is the reported one
80 assert ((valskip + 1) * nvalues + short.size + header.size) == plen
81 assert double.size == number.size
83 result = []
84 for dstype in [ord(x) for x in buf[header.size+short.size:off]]:
85 if dstype == DS_TYPE_COUNTER:
86 result.append((dstype, number.unpack_from(buf, off)[0]))
87 off += valskip
88 elif dstype == DS_TYPE_GAUGE:
89 result.append((dstype, double.unpack_from(buf, off)[0]))
90 off += valskip
91 elif dstype == DS_TYPE_DERIVE:
92 result.append((dstype, number.unpack_from(buf, off)[0]))
93 off += valskip
94 elif dstype == DS_TYPE_ABSOLUTE:
95 result.append((dstype, number.unpack_from(buf, off)[0]))
96 off += valskip
97 else:
98 raise ValueError("DS type %i unsupported" % dstype)
100 return result
103 def decode_network_number(ptype, plen, buf):
104 """Decodes a number (64-bit unsigned) from collectd network format.
105 """
106 return number.unpack_from(buf, header.size)[0]
109 def decode_network_string(msgtype, plen, buf):
110 """Decodes a string from collectd network format.
111 """
112 return buf[header.size:plen-1]
115 # Mapping of message types to decoding functions.
116 _decoders = {
117 TYPE_VALUES : decode_network_values,
118 TYPE_TIME : decode_network_number,
119 TYPE_TIME_HR : decode_network_number,
120 TYPE_INTERVAL : decode_network_number,
121 TYPE_INTERVAL_HR : decode_network_number,
122 TYPE_HOST : decode_network_string,
123 TYPE_PLUGIN : decode_network_string,
124 TYPE_PLUGIN_INSTANCE: decode_network_string,
125 TYPE_TYPE : decode_network_string,
126 TYPE_TYPE_INSTANCE : decode_network_string,
127 TYPE_MESSAGE : decode_network_string,
128 TYPE_SEVERITY : decode_network_number,
129 }
132 def decode_network_packet(buf):
133 """Decodes a network packet in collectd format.
134 """
135 off = 0
136 blen = len(buf)
138 while off < blen:
139 ptype, plen = header.unpack_from(buf, off)
141 if plen > blen - off:
142 raise ValueError("Packet longer than amount of data in buffer")
144 if ptype not in _decoders:
145 raise ValueError("Message type %i not recognized" % ptype)
147 yield ptype, _decoders[ptype](ptype, plen, buf[off:])
148 off += plen
151 class Data(object):
152 time = 0
153 host = None
154 plugin = None
155 plugininstance = None
156 type = None
157 typeinstance = None
159 def __init__(self, **kw):
160 [setattr(self, k, v) for k, v in kw.items()]
162 @property
163 def datetime(self):
164 return datetime.fromtimestamp(self.time)
166 @property
167 def source(self):
168 buf = StringIO()
169 if self.host:
170 buf.write(str(self.host))
171 if self.plugin:
172 buf.write("/")
173 buf.write(str(self.plugin))
174 if self.plugininstance:
175 buf.write("/")
176 buf.write(str(self.plugininstance))
177 if self.type:
178 buf.write("/")
179 buf.write(str(self.type))
180 if self.typeinstance:
181 buf.write("/")
182 buf.write(str(self.typeinstance))
183 return buf.getvalue()
185 def __str__(self):
186 return "[%i] %s" % (self.time, self.source)
190 class Notification(Data):
191 FAILURE = 1
192 WARNING = 2
193 OKAY = 4
195 SEVERITY = {
196 FAILURE: "FAILURE",
197 WARNING: "WARNING",
198 OKAY : "OKAY",
199 }
201 __severity = 0
202 message = ""
204 def __set_severity(self, value):
205 if value in (self.FAILURE, self.WARNING, self.OKAY):
206 self.__severity = value
208 severity = property(lambda self: self.__severity, __set_severity)
210 @property
211 def severitystring(self):
212 return self.SEVERITY.get(self.severity, "UNKNOWN")
214 def __str__(self):
215 return "%s [%s] %s" % (
216 super(Notification, self).__str__(),
217 self.severitystring,
218 self.message)
222 class Values(Data, list):
223 def __str__(self):
224 return "%s %s" % (Data.__str__(self), list.__str__(self))
228 def interpret_opcodes(iterable):
229 vl = Values()
230 nt = Notification()
232 for kind, data in iterable:
233 if kind == TYPE_TIME:
234 vl.time = nt.time = data
235 elif kind == TYPE_TIME_HR:
236 vl.time = nt.time = data / HR_TIME_DIV
237 elif kind == TYPE_INTERVAL:
238 vl.interval = data
239 elif kind == TYPE_INTERVAL_HR:
240 vl.interval = data / HR_TIME_DIV
241 elif kind == TYPE_HOST:
242 vl.host = nt.host = data
243 elif kind == TYPE_PLUGIN:
244 vl.plugin = nt.plugin = data
245 elif kind == TYPE_PLUGIN_INSTANCE:
246 vl.plugininstance = nt.plugininstance = data
247 elif kind == TYPE_TYPE:
248 vl.type = nt.type = data
249 elif kind == TYPE_TYPE_INSTANCE:
250 vl.typeinstance = nt.typeinstance = data
251 elif kind == TYPE_SEVERITY:
252 nt.severity = data
253 elif kind == TYPE_MESSAGE:
254 nt.message = data
255 yield deepcopy(nt)
256 elif kind == TYPE_VALUES:
257 vl[:] = data
258 yield deepcopy(vl)
262 class Reader(object):
263 """Network reader for collectd data.
265 Listens on the network in a given address, which can be a multicast
266 group address, and handles reading data when it arrives.
267 """
268 addr = None
269 host = None
270 port = DEFAULT_PORT
272 BUFFER_SIZE = 16384
275 def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
276 if host is None:
277 multicast = True
278 host = DEFAULT_IPv4_GROUP
280 self.host, self.port = host, port
281 self.ipv6 = ":" in self.host
283 family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
284 None if multicast else self.host, self.port,
285 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
286 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
288 self._sock = socket.socket(family, socktype, proto)
289 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
290 self._sock.bind(sockaddr)
292 if multicast:
293 if hasattr(socket, "SO_REUSEPORT"):
294 self._sock.setsockopt(
295 socket.SOL_SOCKET,
296 socket.SO_REUSEPORT, 1)
298 val = None
299 if family == socket.AF_INET:
300 assert "." in self.host
301 val = struct.pack("4sl",
302 socket.inet_aton(self.host), socket.INADDR_ANY)
303 elif family == socket.AF_INET6:
304 raise NotImplementedError("IPv6 support not ready yet")
305 else:
306 raise ValueError("Unsupported network address family")
308 self._sock.setsockopt(
309 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
310 socket.IP_ADD_MEMBERSHIP, val)
311 self._sock.setsockopt(
312 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
313 socket.IP_MULTICAST_LOOP, 0)
316 def receive(self):
317 """Receives a single raw collect network packet.
318 """
319 return self._sock.recv(self.BUFFER_SIZE)
322 def decode(self, buf=None):
323 """Decodes a given buffer or the next received packet.
324 """
325 if buf is None:
326 buf = self.receive()
327 return decode_network_packet(buf)
330 def interpret(self, iterable=None):
331 """Interprets a sequence
332 """
333 if iterable is None:
334 iterable = self.decode()
335 if isinstance(iterable, str):
336 iterable = self.decode(iterable)
337 return interpret_opcodes(iterable)