1 # collectd - Collectd.pm
2 # Copyright (C) 2007, 2008 Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 #
17 # Author:
18 # Sebastian Harl <sh at tokkee.org>
20 package Collectd;
22 use strict;
23 use warnings;
25 use Config;
27 use threads;
28 use threads::shared;
30 BEGIN {
31 if (! $Config{'useithreads'}) {
32 die "Perl does not support ithreads!";
33 }
34 }
36 require Exporter;
38 our @ISA = qw( Exporter );
40 our %EXPORT_TAGS = (
41 'plugin' => [ qw(
42 plugin_register
43 plugin_unregister
44 plugin_dispatch_values
45 plugin_flush
46 plugin_flush_one
47 plugin_flush_all
48 plugin_dispatch_notification
49 plugin_log
50 ) ],
51 'types' => [ qw(
52 TYPE_INIT
53 TYPE_READ
54 TYPE_WRITE
55 TYPE_SHUTDOWN
56 TYPE_LOG
57 TYPE_NOTIF
58 TYPE_FLUSH
59 TYPE_CONFIG
60 TYPE_DATASET
61 ) ],
62 'ds_types' => [ qw(
63 DS_TYPE_COUNTER
64 DS_TYPE_GAUGE
65 ) ],
66 'log' => [ qw(
67 ERROR
68 WARNING
69 NOTICE
70 INFO
71 DEBUG
72 LOG_ERR
73 LOG_WARNING
74 LOG_NOTICE
75 LOG_INFO
76 LOG_DEBUG
77 ) ],
78 'notif' => [ qw(
79 NOTIF_FAILURE
80 NOTIF_WARNING
81 NOTIF_OKAY
82 ) ],
83 'globals' => [ qw(
84 $hostname_g
85 $interval_g
86 ) ],
87 );
89 {
90 my %seen;
91 push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
92 foreach keys %EXPORT_TAGS;
93 }
95 # global variables
96 our $hostname_g;
97 our $interval_g;
99 Exporter::export_ok_tags ('all');
101 my @plugins : shared = ();
102 my %cf_callbacks : shared = ();
104 my %types = (
105 TYPE_INIT, "init",
106 TYPE_READ, "read",
107 TYPE_WRITE, "write",
108 TYPE_SHUTDOWN, "shutdown",
109 TYPE_LOG, "log",
110 TYPE_NOTIF, "notify",
111 TYPE_FLUSH, "flush"
112 );
114 foreach my $type (keys %types) {
115 $plugins[$type] = &share ({});
116 }
118 sub _log {
119 my $caller = shift;
120 my $lvl = shift;
121 my $msg = shift;
123 if ("Collectd" eq $caller) {
124 $msg = "perl: $msg";
125 }
126 return plugin_log ($lvl, $msg);
127 }
129 sub ERROR { _log (scalar caller, LOG_ERR, shift); }
130 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
131 sub NOTICE { _log (scalar caller, LOG_NOTICE, shift); }
132 sub INFO { _log (scalar caller, LOG_INFO, shift); }
133 sub DEBUG { _log (scalar caller, LOG_DEBUG, shift); }
135 sub plugin_call_all {
136 my $type = shift;
138 my %plugins;
140 our $cb_name = undef;
142 if (! defined $type) {
143 return;
144 }
146 if (TYPE_LOG != $type) {
147 DEBUG ("Collectd::plugin_call: type = \"$type\", args=\"@_\"");
148 }
150 if (! defined $plugins[$type]) {
151 ERROR ("Collectd::plugin_call: unknown type \"$type\"");
152 return;
153 }
155 {
156 lock %{$plugins[$type]};
157 %plugins = %{$plugins[$type]};
158 }
160 foreach my $plugin (keys %plugins) {
161 my $p = $plugins{$plugin};
163 my $status = 0;
165 if ($p->{'wait_left'} > 0) {
166 $p->{'wait_left'} -= $interval_g;
167 }
169 next if ($p->{'wait_left'} > 0);
171 $cb_name = $p->{'cb_name'};
172 $status = call_by_name (@_);
174 if (! $status) {
175 my $err = undef;
177 if ($@) {
178 $err = $@;
179 }
180 else {
181 $err = "callback returned false";
182 }
184 if (TYPE_LOG != $type) {
185 ERROR ("Execution of callback \"$cb_name\" failed: $err");
186 }
188 $status = 0;
189 }
191 if ($status) {
192 $p->{'wait_left'} = 0;
193 $p->{'wait_time'} = $interval_g;
194 }
195 elsif (TYPE_READ == $type) {
196 if ($p->{'wait_time'} < $interval_g) {
197 $p->{'wait_time'} = $interval_g;
198 }
200 $p->{'wait_left'} = $p->{'wait_time'};
201 $p->{'wait_time'} *= 2;
203 if ($p->{'wait_time'} > 86400) {
204 $p->{'wait_time'} = 86400;
205 }
207 WARNING ("${plugin}->read() failed with status $status. "
208 . "Will suspend it for $p->{'wait_left'} seconds.");
209 }
210 elsif (TYPE_INIT == $type) {
211 ERROR ("${plugin}->init() failed with status $status. "
212 . "Plugin will be disabled.");
214 foreach my $type (keys %types) {
215 plugin_unregister ($type, $plugin);
216 }
217 }
218 elsif (TYPE_LOG != $type) {
219 WARNING ("${plugin}->$types{$type}() failed with status $status.");
220 }
221 }
222 return 1;
223 }
225 # Collectd::plugin_register (type, name, data).
226 #
227 # type:
228 # init, read, write, shutdown, data set
229 #
230 # name:
231 # name of the plugin
232 #
233 # data:
234 # reference to the plugin's subroutine that does the work or the data set
235 # definition
236 sub plugin_register {
237 my $type = shift;
238 my $name = shift;
239 my $data = shift;
241 DEBUG ("Collectd::plugin_register: "
242 . "type = \"$type\", name = \"$name\", data = \"$data\"");
244 if (! ((defined $type) && (defined $name) && (defined $data))) {
245 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
246 return;
247 }
249 if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
250 && (TYPE_CONFIG != $type)) {
251 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
252 return;
253 }
255 if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
256 return plugin_register_data_set ($name, $data);
257 }
258 elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
259 my $pkg = scalar caller;
261 if ($data !~ m/^$pkg\:\:/) {
262 $data = $pkg . "::" . $data;
263 }
265 lock %cf_callbacks;
266 $cf_callbacks{$name} = $data;
267 }
268 elsif ((TYPE_DATASET != $type) && (! ref $data)) {
269 my $pkg = scalar caller;
271 my %p : shared;
273 if ($data !~ m/^$pkg\:\:/) {
274 $data = $pkg . "::" . $data;
275 }
277 %p = (
278 wait_time => $interval_g,
279 wait_left => 0,
280 cb_name => $data,
281 );
283 lock %{$plugins[$type]};
284 $plugins[$type]->{$name} = \%p;
285 }
286 else {
287 ERROR ("Collectd::plugin_register: Invalid data.");
288 return;
289 }
290 return 1;
291 }
293 sub plugin_unregister {
294 my $type = shift;
295 my $name = shift;
297 DEBUG ("Collectd::plugin_unregister: type = \"$type\", name = \"$name\"");
299 if (! ((defined $type) && (defined $name))) {
300 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
301 return;
302 }
304 if (TYPE_DATASET == $type) {
305 return plugin_unregister_data_set ($name);
306 }
307 elsif (TYPE_CONFIG == $type) {
308 lock %cf_callbacks;
309 delete $cf_callbacks{$name};
310 }
311 elsif (defined $plugins[$type]) {
312 lock %{$plugins[$type]};
313 delete $plugins[$type]->{$name};
314 }
315 else {
316 ERROR ("Collectd::plugin_unregister: Invalid type.");
317 return;
318 }
319 }
321 sub plugin_flush {
322 my %args = @_;
324 my $timeout = -1;
325 my @plugins = ();
326 my @ids = ();
328 DEBUG ("Collectd::plugin_flush:"
329 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
330 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
331 . (defined ($args{'identifiers'})
332 ? " identifiers = $args{'identifiers'}" : ""));
334 if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
335 $timeout = $args{'timeout'};
336 }
338 if (defined ($args{'plugins'})) {
339 if ("ARRAY" eq ref ($args{'plugins'})) {
340 @plugins = @{$args{'plugins'}};
341 }
342 else {
343 @plugins = ($args{'plugins'});
344 }
345 }
346 else {
347 @plugins = (undef);
348 }
350 if (defined ($args{'identifiers'})) {
351 if ("ARRAY" eq ref ($args{'identifiers'})) {
352 @ids = @{$args{'identifiers'}};
353 }
354 else {
355 @ids = ($args{'identifiers'});
356 }
357 }
358 else {
359 @ids = (undef);
360 }
362 foreach my $plugin (@plugins) {
363 foreach my $id (@ids) {
364 _plugin_flush($plugin, $timeout, $id);
365 }
366 }
367 }
369 sub plugin_flush_one {
370 my $timeout = shift;
371 my $name = shift;
373 WARNING ("Collectd::plugin_flush_one is deprecated - "
374 . "use Collectd::plugin_flush instead.");
376 if (! (defined ($timeout) && defined ($name))) {
377 ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
378 return;
379 }
381 plugin_flush (plugins => $name, timeout => $timeout);
382 }
384 sub plugin_flush_all {
385 my $timeout = shift;
387 WARNING ("Collectd::plugin_flush_all is deprecated - "
388 . "use Collectd::plugin_flush instead.");
390 if (! defined ($timeout)) {
391 ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
392 return;
393 }
395 plugin_flush (timeout => $timeout);
396 }
398 sub _plugin_dispatch_config {
399 my $plugin = shift;
400 my $config = shift;
402 our $cb_name = undef;
404 if (! (defined ($plugin) && defined ($config))) {
405 return;
406 }
408 if (! defined $cf_callbacks{$plugin}) {
409 WARNING ("Found a configuration for the \"$plugin\" plugin, but "
410 . "the plugin isn't loaded or didn't register "
411 . "a configuration callback.");
412 return;
413 }
415 {
416 lock %cf_callbacks;
417 $cb_name = $cf_callbacks{$plugin};
418 }
419 call_by_name ($config);
420 }
422 1;
424 # vim: set sw=4 ts=4 tw=78 noexpandtab :