1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
4 #
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
6 #
7 # Distributed under terms of the GPLv2 license.
9 """
10 Collectd network protocol implementation.
11 """
13 import socket
14 import struct
16 try:
17 from cStringIO import StringIO
18 except ImportError:
19 from StringIO import StringIO
21 from datetime import datetime
22 from copy import deepcopy
25 DEFAULT_PORT = 25826
26 """Default port"""
28 DEFAULT_IPv4_GROUP = "239.192.74.66"
29 """Default IPv4 multicast group"""
31 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
32 """Default IPv6 multicast group"""
36 # Message kinds
37 TYPE_HOST = 0x0000
38 TYPE_TIME = 0x0001
39 TYPE_PLUGIN = 0x0002
40 TYPE_PLUGIN_INSTANCE = 0x0003
41 TYPE_TYPE = 0x0004
42 TYPE_TYPE_INSTANCE = 0x0005
43 TYPE_VALUES = 0x0006
44 TYPE_INTERVAL = 0x0007
46 # For notifications
47 TYPE_MESSAGE = 0x0100
48 TYPE_SEVERITY = 0x0101
50 # DS kinds
51 DS_TYPE_COUNTER = 0
52 DS_TYPE_GAUGE = 1
55 header = struct.Struct("!2H")
56 number = struct.Struct("!Q")
57 short = struct.Struct("!H")
58 double = struct.Struct("<d")
61 def decode_network_values(ptype, plen, buf):
62 """Decodes a list of DS values in collectd network format
63 """
64 nvalues = short.unpack_from(buf, header.size)[0]
65 off = header.size + short.size + nvalues
66 valskip = double.size
68 # Check whether our expected packet size is the reported one
69 assert ((valskip + 1) * nvalues + short.size + header.size) == plen
70 assert double.size == number.size
72 result = []
73 for dstype in map(ord, buf[header.size+short.size:off]):
74 if dstype == DS_TYPE_COUNTER:
75 result.append((dstype, number.unpack_from(buf, off)[0]))
76 off += valskip
77 elif dstype == DS_TYPE_GAUGE:
78 result.append((dstype, double.unpack_from(buf, off)[0]))
79 off += valskip
80 else:
81 raise ValueError("DS type %i unsupported" % dstype)
83 return result
86 def decode_network_number(ptype, plen, buf):
87 """Decodes a number (64-bit unsigned) in collectd network format.
88 """
89 return number.unpack_from(buf, header.size)[0]
92 def decode_network_string(msgtype, plen, buf):
93 """Decodes a floating point number (64-bit) in collectd network format.
94 """
95 return buf[header.size:plen-1]
98 # Mapping of message types to decoding functions.
99 _decoders = {
100 TYPE_VALUES : decode_network_values,
101 TYPE_TIME : decode_network_number,
102 TYPE_INTERVAL : decode_network_number,
103 TYPE_HOST : decode_network_string,
104 TYPE_PLUGIN : decode_network_string,
105 TYPE_PLUGIN_INSTANCE: decode_network_string,
106 TYPE_TYPE : decode_network_string,
107 TYPE_TYPE_INSTANCE : decode_network_string,
108 TYPE_MESSAGE : decode_network_string,
109 TYPE_SEVERITY : decode_network_number,
110 }
113 def decode_network_packet(buf):
114 """Decodes a network packet in collectd format.
115 """
116 off = 0
117 blen = len(buf)
118 while off < blen:
119 ptype, plen = header.unpack_from(buf, off)
121 if plen > blen - off:
122 raise ValueError("Packet longer than amount of data in buffer")
124 if ptype not in _decoders:
125 raise ValueError("Message type %i not recognized" % ptype)
127 yield ptype, _decoders[ptype](ptype, plen, buf[off:])
128 off += plen
134 class Data(object):
135 time = 0
136 host = None
137 plugin = None
138 plugininstance = None
139 type = None
140 typeinstance = None
142 def __init__(self, **kw):
143 [setattr(self, k, v) for k, v in kw.iteritems()]
145 @property
146 def datetime(self):
147 return datetime.fromtimestamp(self.time)
149 @property
150 def source(self):
151 buf = StringIO()
152 if self.host:
153 buf.write(self.host)
154 if self.plugin:
155 buf.write("/")
156 buf.write(self.plugin)
157 if self.plugininstance:
158 buf.write("/")
159 buf.write(self.plugininstance)
160 if self.type:
161 buf.write("/")
162 buf.write(self.type)
163 if self.typeinstance:
164 buf.write("/")
165 buf.write(self.typeinstance)
166 return buf.getvalue()
168 def __str__(self):
169 return "[%i] %s" % (self.time, self.source)
173 class Notification(Data):
174 FAILURE = 1
175 WARNING = 2
176 OKAY = 4
178 SEVERITY = {
179 FAILURE: "FAILURE",
180 WARNING: "WARNING",
181 OKAY : "OKAY",
182 }
184 __severity = 0
185 message = ""
187 def __set_severity(self, value):
188 if value in (self.FAILURE, self.WARNING, self.OKAY):
189 self.__severity = value
191 severity = property(lambda self: self.__severity, __set_severity)
193 @property
194 def severitystring(self):
195 return self.SEVERITY.get(self.severity, "UNKNOWN")
197 def __str__(self):
198 return "%s [%s] %s" % (
199 super(Notification, self).__str__(),
200 self.severitystring,
201 self.message)
205 class Values(Data, list):
206 def __str__(self):
207 return "%s %s" % (Data.__str__(self), list.__str__(self))
211 def interpret_opcodes(iterable):
212 vl = Values()
213 nt = Notification()
215 for kind, data in iterable:
216 if kind == TYPE_TIME:
217 vl.time = nt.time = data
218 elif kind == TYPE_INTERVAL:
219 vl.interval = data
220 elif kind == TYPE_HOST:
221 vl.host = nt.host = data
222 elif kind == TYPE_PLUGIN:
223 vl.plugin = nt.plugin = data
224 elif kind == TYPE_PLUGIN_INSTANCE:
225 vl.plugininstance = nt.plugininstance = data
226 elif kind == TYPE_TYPE:
227 vl.type = nt.type = data
228 elif kind == TYPE_TYPE_INSTANCE:
229 vl.typeinstance = nt.typeinstance = data
230 elif kind == TYPE_SEVERITY:
231 nt.severity = data
232 elif kind == TYPE_MESSAGE:
233 nt.message = data
234 yield deepcopy(nt)
235 elif kind == TYPE_VALUES:
236 vl[:] = data
237 yield deepcopy(vl)
241 class Reader(object):
242 """Network reader for collectd data.
244 Listens on the network in a given address, which can be a multicast
245 group address, and handles reading data when it arrives.
246 """
247 addr = None
248 host = None
249 port = DEFAULT_PORT
251 BUFFER_SIZE = 1024
254 def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
255 if host is None:
256 multicast = True
257 host = DEFAULT_IPv4_GROUP
259 self.host, self.port = host, port
260 self.ipv6 = ":" in self.host
262 family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
263 None if multicast else self.host, self.port,
264 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
265 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
267 self._sock = socket.socket(family, socktype, proto)
268 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
269 self._sock.bind(sockaddr)
271 if multicast:
272 if hasattr(socket, "SO_REUSEPORT"):
273 self._sock.setsockopt(
274 socket.SOL_SOCKET,
275 socket.SO_REUSEPORT, 1)
277 val = None
278 if family == socket.AF_INET:
279 assert "." in self.host
280 val = struct.pack("4sl",
281 socket.inet_aton(self.host), socket.INADDR_ANY)
282 elif family == socket.AF_INET6:
283 raise NotImplementedError("IPv6 support not ready yet")
284 else:
285 raise ValueError("Unsupported network address family")
287 self._sock.setsockopt(
288 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
289 socket.IP_ADD_MEMBERSHIP, val)
290 self._sock.setsockopt(
291 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
292 socket.IP_MULTICAST_LOOP, 0)
295 def receive(self):
296 """Receives a single raw collect network packet.
297 """
298 return self._sock.recv(self.BUFFER_SIZE)
301 def decode(self, buf=None):
302 """Decodes a given buffer or the next received packet.
303 """
304 if buf is None:
305 buf = self.receive()
306 return decode_network_packet(buf)
309 def interpret(self, iterable=None):
310 """Interprets a sequence
311 """
312 if iterable is None:
313 iterable = self.decode()
314 if isinstance(iterable, basestring):
315 iterable = self.decode(iterable)
316 return interpret_opcodes(iterable)