0222cdbdb3b3d526e14f3f3f08ffda4b866aba07
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
4 #
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
6 #
7 # Distributed under terms of the GPLv2 license.
8 #
9 # Frank Marien (frank@apsu.be) 4 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated for python 3
12 # - fixed for larger packet sizes (possible on lo interface)
14 """
15 Collectd network protocol implementation.
16 """
18 import socket
19 import struct
20 try:
21 from io import StringIO
22 except ImportError:
23 from cStringIO import StringIO
25 from datetime import datetime
26 from copy import deepcopy
29 DEFAULT_PORT = 25826
30 """Default port"""
32 DEFAULT_IPv4_GROUP = "239.192.74.66"
33 """Default IPv4 multicast group"""
35 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
36 """Default IPv6 multicast group"""
40 # Message kinds
41 TYPE_HOST = 0x0000
42 TYPE_TIME = 0x0001
43 TYPE_TIME_HR = 0x0008
44 TYPE_PLUGIN = 0x0002
45 TYPE_PLUGIN_INSTANCE = 0x0003
46 TYPE_TYPE = 0x0004
47 TYPE_TYPE_INSTANCE = 0x0005
48 TYPE_VALUES = 0x0006
49 TYPE_INTERVAL = 0x0007
50 TYPE_INTERVAL_HR = 0x0009
52 # For notifications
53 TYPE_MESSAGE = 0x0100
54 TYPE_SEVERITY = 0x0101
56 # DS kinds
57 DS_TYPE_COUNTER = 0
58 DS_TYPE_GAUGE = 1
59 DS_TYPE_DERIVE = 2
60 DS_TYPE_ABSOLUTE = 3
63 header = struct.Struct("!2H")
64 number = struct.Struct("!Q")
65 short = struct.Struct("!H")
66 double = struct.Struct("<d")
69 def decode_network_values(ptype, plen, buf):
70 """Decodes a list of DS values in collectd network format
71 """
72 nvalues = short.unpack_from(buf, header.size)[0]
73 off = header.size + short.size + nvalues
74 valskip = double.size
76 # Check whether our expected packet size is the reported one
77 assert ((valskip + 1) * nvalues + short.size + header.size) == plen
78 assert double.size == number.size
80 result = []
81 for dstype in buf[header.size+short.size:off]:
82 if dstype == DS_TYPE_COUNTER:
83 result.append((dstype, number.unpack_from(buf, off)[0]))
84 off += valskip
85 elif dstype == DS_TYPE_GAUGE:
86 result.append((dstype, double.unpack_from(buf, off)[0]))
87 off += valskip
88 elif dstype == DS_TYPE_DERIVE:
89 result.append((dstype, number.unpack_from(buf, off)[0]))
90 off += valskip
91 elif dstype == DS_TYPE_ABSOLUTE:
92 result.append((dstype, number.unpack_from(buf, off)[0]))
93 off += valskip
94 else:
95 raise ValueError("DS type %i unsupported" % dstype)
97 return result
100 def decode_network_number(ptype, plen, buf):
101 """Decodes a number (64-bit unsigned) in collectd network format.
102 """
103 return number.unpack_from(buf, header.size)[0]
106 def decode_network_string(msgtype, plen, buf):
107 """Decodes a floating point number (64-bit) in collectd network format.
108 """
109 return buf[header.size:plen-1]
112 # Mapping of message types to decoding functions.
113 _decoders = {
114 TYPE_VALUES : decode_network_values,
115 TYPE_TIME : decode_network_number,
116 TYPE_TIME_HR : decode_network_number,
117 TYPE_INTERVAL : decode_network_number,
118 TYPE_INTERVAL_HR : decode_network_number,
119 TYPE_HOST : decode_network_string,
120 TYPE_PLUGIN : decode_network_string,
121 TYPE_PLUGIN_INSTANCE: decode_network_string,
122 TYPE_TYPE : decode_network_string,
123 TYPE_TYPE_INSTANCE : decode_network_string,
124 TYPE_MESSAGE : decode_network_string,
125 TYPE_SEVERITY : decode_network_number,
126 }
129 def decode_network_packet(buf):
130 """Decodes a network packet in collectd format.
131 """
132 off = 0
133 blen = len(buf)
135 while off < blen:
136 ptype, plen = header.unpack_from(buf, off)
138 if plen > blen - off:
139 raise ValueError("Packet longer than amount of data in buffer")
141 if ptype not in _decoders:
142 raise ValueError("Message type %i not recognized" % ptype)
144 yield ptype, _decoders[ptype](ptype, plen, buf[off:])
145 off += plen
148 class Data(object):
149 time = 0
150 host = None
151 plugin = None
152 plugininstance = None
153 type = None
154 typeinstance = None
156 def __init__(self, **kw):
157 [setattr(self, k, v) for k, v in kw.items()]
159 @property
160 def datetime(self):
161 return datetime.fromtimestamp(self.time)
163 @property
164 def source(self):
165 buf = StringIO()
166 if self.host:
167 buf.write(str(self.host))
168 if self.plugin:
169 buf.write("/")
170 buf.write(str(self.plugin))
171 if self.plugininstance:
172 buf.write("/")
173 buf.write(str(self.plugininstance))
174 if self.type:
175 buf.write("/")
176 buf.write(str(self.type))
177 if self.typeinstance:
178 buf.write("/")
179 buf.write(str(self.typeinstance))
180 return buf.getvalue()
182 def __str__(self):
183 return "[%i] %s" % (self.time, self.source)
187 class Notification(Data):
188 FAILURE = 1
189 WARNING = 2
190 OKAY = 4
192 SEVERITY = {
193 FAILURE: "FAILURE",
194 WARNING: "WARNING",
195 OKAY : "OKAY",
196 }
198 __severity = 0
199 message = ""
201 def __set_severity(self, value):
202 if value in (self.FAILURE, self.WARNING, self.OKAY):
203 self.__severity = value
205 severity = property(lambda self: self.__severity, __set_severity)
207 @property
208 def severitystring(self):
209 return self.SEVERITY.get(self.severity, "UNKNOWN")
211 def __str__(self):
212 return "%s [%s] %s" % (
213 super(Notification, self).__str__(),
214 self.severitystring,
215 self.message)
219 class Values(Data, list):
220 def __str__(self):
221 return "%s %s" % (Data.__str__(self), list.__str__(self))
225 def interpret_opcodes(iterable):
226 vl = Values()
227 nt = Notification()
229 for kind, data in iterable:
230 if kind == TYPE_TIME:
231 vl.time = nt.time = data
232 elif kind == TYPE_TIME_HR:
233 vl.time = nt.time = data
234 elif kind == TYPE_INTERVAL:
235 vl.interval = data
236 elif kind == TYPE_INTERVAL_HR:
237 vl.interval = data
238 elif kind == TYPE_HOST:
239 vl.host = nt.host = data
240 elif kind == TYPE_PLUGIN:
241 vl.plugin = nt.plugin = data
242 elif kind == TYPE_PLUGIN_INSTANCE:
243 vl.plugininstance = nt.plugininstance = data
244 elif kind == TYPE_TYPE:
245 vl.type = nt.type = data
246 elif kind == TYPE_TYPE_INSTANCE:
247 vl.typeinstance = nt.typeinstance = data
248 elif kind == TYPE_SEVERITY:
249 nt.severity = data
250 elif kind == TYPE_MESSAGE:
251 nt.message = data
252 yield deepcopy(nt)
253 elif kind == TYPE_VALUES:
254 vl[:] = data
255 yield deepcopy(vl)
259 class Reader(object):
260 """Network reader for collectd data.
262 Listens on the network in a given address, which can be a multicast
263 group address, and handles reading data when it arrives.
264 """
265 addr = None
266 host = None
267 port = DEFAULT_PORT
269 BUFFER_SIZE = 16384
272 def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
273 if host is None:
274 multicast = True
275 host = DEFAULT_IPv4_GROUP
277 self.host, self.port = host, port
278 self.ipv6 = ":" in self.host
280 family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
281 None if multicast else self.host, self.port,
282 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
283 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
285 self._sock = socket.socket(family, socktype, proto)
286 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
287 self._sock.bind(sockaddr)
289 if multicast:
290 if hasattr(socket, "SO_REUSEPORT"):
291 self._sock.setsockopt(
292 socket.SOL_SOCKET,
293 socket.SO_REUSEPORT, 1)
295 val = None
296 if family == socket.AF_INET:
297 assert "." in self.host
298 val = struct.pack("4sl",
299 socket.inet_aton(self.host), socket.INADDR_ANY)
300 elif family == socket.AF_INET6:
301 raise NotImplementedError("IPv6 support not ready yet")
302 else:
303 raise ValueError("Unsupported network address family")
305 self._sock.setsockopt(
306 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
307 socket.IP_ADD_MEMBERSHIP, val)
308 self._sock.setsockopt(
309 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
310 socket.IP_MULTICAST_LOOP, 0)
313 def receive(self):
314 """Receives a single raw collect network packet.
315 """
316 return self._sock.recv(self.BUFFER_SIZE)
319 def decode(self, buf=None):
320 """Decodes a given buffer or the next received packet.
321 """
322 if buf is None:
323 buf = self.receive()
324 return decode_network_packet(buf)
327 def interpret(self, iterable=None):
328 """Interprets a sequence
329 """
330 if iterable is None:
331 iterable = self.decode()
332 if isinstance(iterable, str):
333 iterable = self.decode(iterable)
334 return interpret_opcodes(iterable)