1 # collectd - Collectd.pm
2 # Copyright (C) 2007-2009 Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 #
17 # Author:
18 # Sebastian Harl <sh at tokkee.org>
20 package Collectd;
22 use strict;
23 use warnings;
25 use Config;
27 use threads;
28 use threads::shared;
30 BEGIN {
31 if (! $Config{'useithreads'}) {
32 die "Perl does not support ithreads!";
33 }
34 }
36 require Exporter;
38 our @ISA = qw( Exporter );
40 our %EXPORT_TAGS = (
41 'plugin' => [ qw(
42 plugin_register
43 plugin_unregister
44 plugin_dispatch_values
45 plugin_write
46 plugin_flush
47 plugin_flush_one
48 plugin_flush_all
49 plugin_dispatch_notification
50 plugin_log
51 ) ],
52 'types' => [ qw(
53 TYPE_INIT
54 TYPE_READ
55 TYPE_WRITE
56 TYPE_SHUTDOWN
57 TYPE_LOG
58 TYPE_NOTIF
59 TYPE_FLUSH
60 TYPE_CONFIG
61 TYPE_DATASET
62 ) ],
63 'ds_types' => [ qw(
64 DS_TYPE_COUNTER
65 DS_TYPE_GAUGE
66 ) ],
67 'log' => [ qw(
68 ERROR
69 WARNING
70 NOTICE
71 INFO
72 DEBUG
73 LOG_ERR
74 LOG_WARNING
75 LOG_NOTICE
76 LOG_INFO
77 LOG_DEBUG
78 ) ],
79 'filter_chain' => [ qw(
80 fc_register
81 FC_MATCH_NO_MATCH
82 FC_MATCH_MATCHES
83 FC_TARGET_CONTINUE
84 FC_TARGET_STOP
85 FC_TARGET_RETURN
86 ) ],
87 'fc_types' => [ qw(
88 FC_MATCH
89 FC_TARGET
90 ) ],
91 'notif' => [ qw(
92 NOTIF_FAILURE
93 NOTIF_WARNING
94 NOTIF_OKAY
95 ) ],
96 'globals' => [ qw(
97 $hostname_g
98 $interval_g
99 ) ],
100 );
102 {
103 my %seen;
104 push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
105 foreach keys %EXPORT_TAGS;
106 }
108 # global variables
109 our $hostname_g;
110 our $interval_g;
112 Exporter::export_ok_tags ('all');
114 my @plugins : shared = ();
115 my @fc_plugins : shared = ();
116 my %cf_callbacks : shared = ();
118 my %types = (
119 TYPE_CONFIG, "config",
120 TYPE_INIT, "init",
121 TYPE_READ, "read",
122 TYPE_WRITE, "write",
123 TYPE_SHUTDOWN, "shutdown",
124 TYPE_LOG, "log",
125 TYPE_NOTIF, "notify",
126 TYPE_FLUSH, "flush"
127 );
129 my %fc_types = (
130 FC_MATCH, "match",
131 FC_TARGET, "target"
132 );
134 my %fc_exec_names = (
135 FC_MATCH, "match",
136 FC_TARGET, "invoke"
137 );
139 my %fc_cb_types = (
140 FC_CB_EXEC, "exec",
141 FC_CB_CREATE, "create",
142 FC_CB_DESTROY, "destroy"
143 );
145 foreach my $type (keys %types) {
146 $plugins[$type] = &share ({});
147 }
149 foreach my $type (keys %fc_types) {
150 $fc_plugins[$type] = &share ({});
151 }
153 sub _log {
154 my $caller = shift;
155 my $lvl = shift;
156 my $msg = shift;
158 if ("Collectd" eq $caller) {
159 $msg = "perl: $msg";
160 }
161 return plugin_log ($lvl, $msg);
162 }
164 sub ERROR { _log (scalar caller, LOG_ERR, shift); }
165 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
166 sub NOTICE { _log (scalar caller, LOG_NOTICE, shift); }
167 sub INFO { _log (scalar caller, LOG_INFO, shift); }
168 sub DEBUG { _log (scalar caller, LOG_DEBUG, shift); }
170 sub plugin_call_all {
171 my $type = shift;
173 my %plugins;
175 our $cb_name = undef;
177 if (! defined $type) {
178 return;
179 }
181 if (TYPE_LOG != $type) {
182 DEBUG ("Collectd::plugin_call: type = \"$type\" ("
183 . $types{$type} . "), args=\""
184 . join(', ', map { defined($_) ? $_ : '<undef>' } @_) . "\"");
185 }
187 if (! defined $plugins[$type]) {
188 ERROR ("Collectd::plugin_call: unknown type \"$type\"");
189 return;
190 }
192 {
193 lock %{$plugins[$type]};
194 %plugins = %{$plugins[$type]};
195 }
197 foreach my $plugin (keys %plugins) {
198 my $p = $plugins{$plugin};
200 my $status = 0;
202 if ($p->{'wait_left'} > 0) {
203 $p->{'wait_left'} -= $interval_g;
204 }
206 next if ($p->{'wait_left'} > 0);
208 $cb_name = $p->{'cb_name'};
209 $status = call_by_name (@_);
211 if (! $status) {
212 my $err = undef;
214 if ($@) {
215 $err = $@;
216 }
217 else {
218 $err = "callback returned false";
219 }
221 if (TYPE_LOG != $type) {
222 ERROR ("Execution of callback \"$cb_name\" failed: $err");
223 }
225 $status = 0;
226 }
228 if ($status) {
229 $p->{'wait_left'} = 0;
230 $p->{'wait_time'} = $interval_g;
231 }
232 elsif (TYPE_READ == $type) {
233 if ($p->{'wait_time'} < $interval_g) {
234 $p->{'wait_time'} = $interval_g;
235 }
237 $p->{'wait_left'} = $p->{'wait_time'};
238 $p->{'wait_time'} *= 2;
240 if ($p->{'wait_time'} > 86400) {
241 $p->{'wait_time'} = 86400;
242 }
244 WARNING ("${plugin}->read() failed with status $status. "
245 . "Will suspend it for $p->{'wait_left'} seconds.");
246 }
247 elsif (TYPE_INIT == $type) {
248 ERROR ("${plugin}->init() failed with status $status. "
249 . "Plugin will be disabled.");
251 foreach my $type (keys %types) {
252 plugin_unregister ($type, $plugin);
253 }
254 }
255 elsif (TYPE_LOG != $type) {
256 WARNING ("${plugin}->$types{$type}() failed with status $status.");
257 }
258 }
259 return 1;
260 }
262 # Collectd::plugin_register (type, name, data).
263 #
264 # type:
265 # init, read, write, shutdown, data set
266 #
267 # name:
268 # name of the plugin
269 #
270 # data:
271 # reference to the plugin's subroutine that does the work or the data set
272 # definition
273 sub plugin_register {
274 my $type = shift;
275 my $name = shift;
276 my $data = shift;
278 DEBUG ("Collectd::plugin_register: "
279 . "type = \"$type\" (" . $types{$type}
280 . "), name = \"$name\", data = \"$data\"");
282 if (! ((defined $type) && (defined $name) && (defined $data))) {
283 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
284 return;
285 }
287 if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
288 && (TYPE_CONFIG != $type)) {
289 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
290 return;
291 }
293 if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
294 return plugin_register_data_set ($name, $data);
295 }
296 elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
297 my $pkg = scalar caller;
299 if ($data !~ m/^$pkg\:\:/) {
300 $data = $pkg . "::" . $data;
301 }
303 lock %cf_callbacks;
304 $cf_callbacks{$name} = $data;
305 }
306 elsif ((TYPE_DATASET != $type) && (! ref $data)) {
307 my $pkg = scalar caller;
309 my %p : shared;
311 if ($data !~ m/^$pkg\:\:/) {
312 $data = $pkg . "::" . $data;
313 }
315 %p = (
316 wait_time => $interval_g,
317 wait_left => 0,
318 cb_name => $data,
319 );
321 lock %{$plugins[$type]};
322 $plugins[$type]->{$name} = \%p;
323 }
324 else {
325 ERROR ("Collectd::plugin_register: Invalid data.");
326 return;
327 }
328 return 1;
329 }
331 sub plugin_unregister {
332 my $type = shift;
333 my $name = shift;
335 DEBUG ("Collectd::plugin_unregister: type = \"$type\" ("
336 . $types{$type} . "), name = \"$name\"");
338 if (! ((defined $type) && (defined $name))) {
339 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
340 return;
341 }
343 if (TYPE_DATASET == $type) {
344 return plugin_unregister_data_set ($name);
345 }
346 elsif (TYPE_CONFIG == $type) {
347 lock %cf_callbacks;
348 delete $cf_callbacks{$name};
349 }
350 elsif (defined $plugins[$type]) {
351 lock %{$plugins[$type]};
352 delete $plugins[$type]->{$name};
353 }
354 else {
355 ERROR ("Collectd::plugin_unregister: Invalid type.");
356 return;
357 }
358 }
360 sub plugin_write {
361 my %args = @_;
363 my @plugins = ();
364 my @datasets = ();
365 my @valuelists = ();
367 if (! defined $args{'valuelists'}) {
368 ERROR ("Collectd::plugin_write: Missing 'valuelists' argument.");
369 return;
370 }
372 DEBUG ("Collectd::plugin_write:"
373 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
374 . (defined ($args{'datasets'}) ? " datasets = $args{'datasets'}" : "")
375 . " valueslists = $args{'valuelists'}");
377 if (defined ($args{'plugins'})) {
378 if ("ARRAY" eq ref ($args{'plugins'})) {
379 @plugins = @{$args{'plugins'}};
380 }
381 else {
382 @plugins = ($args{'plugins'});
383 }
384 }
385 else {
386 @plugins = (undef);
387 }
389 if ("ARRAY" eq ref ($args{'valuelists'})) {
390 @valuelists = @{$args{'valuelists'}};
391 }
392 else {
393 @valuelists = ($args{'valuelists'});
394 }
396 if (defined ($args{'datasets'})) {
397 if ("ARRAY" eq ref ($args{'datasets'})) {
398 @datasets = @{$args{'datasets'}};
399 }
400 else {
401 @datasets = ($args{'datasets'});
402 }
403 }
404 else {
405 @datasets = (undef) x scalar (@valuelists);
406 }
408 if ($#datasets != $#valuelists) {
409 ERROR ("Collectd::plugin_write: Invalid number of datasets.");
410 return;
411 }
413 foreach my $plugin (@plugins) {
414 for (my $i = 0; $i < scalar (@valuelists); ++$i) {
415 _plugin_write ($plugin, $datasets[$i], $valuelists[$i]);
416 }
417 }
418 }
420 sub plugin_flush {
421 my %args = @_;
423 my $timeout = -1;
424 my @plugins = ();
425 my @ids = ();
427 DEBUG ("Collectd::plugin_flush:"
428 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
429 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
430 . (defined ($args{'identifiers'})
431 ? " identifiers = $args{'identifiers'}" : ""));
433 if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
434 $timeout = $args{'timeout'};
435 }
437 if (defined ($args{'plugins'})) {
438 if ("ARRAY" eq ref ($args{'plugins'})) {
439 @plugins = @{$args{'plugins'}};
440 }
441 else {
442 @plugins = ($args{'plugins'});
443 }
444 }
445 else {
446 @plugins = (undef);
447 }
449 if (defined ($args{'identifiers'})) {
450 if ("ARRAY" eq ref ($args{'identifiers'})) {
451 @ids = @{$args{'identifiers'}};
452 }
453 else {
454 @ids = ($args{'identifiers'});
455 }
456 }
457 else {
458 @ids = (undef);
459 }
461 foreach my $plugin (@plugins) {
462 foreach my $id (@ids) {
463 _plugin_flush($plugin, $timeout, $id);
464 }
465 }
466 }
468 sub plugin_flush_one {
469 my $timeout = shift;
470 my $name = shift;
472 WARNING ("Collectd::plugin_flush_one is deprecated - "
473 . "use Collectd::plugin_flush instead.");
475 if (! (defined ($timeout) && defined ($name))) {
476 ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
477 return;
478 }
480 plugin_flush (plugins => $name, timeout => $timeout);
481 }
483 sub plugin_flush_all {
484 my $timeout = shift;
486 WARNING ("Collectd::plugin_flush_all is deprecated - "
487 . "use Collectd::plugin_flush instead.");
489 if (! defined ($timeout)) {
490 ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
491 return;
492 }
494 plugin_flush (timeout => $timeout);
495 }
497 sub fc_call {
498 my $type = shift;
499 my $name = shift;
500 my $cb_type = shift;
502 my %proc;
504 our $cb_name = undef;
505 my $status;
507 if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
508 ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
509 return;
510 }
512 if (! defined $fc_plugins[$type]) {
513 ERROR ("Collectd::fc_call: Invalid type \"$type\"");
514 return;
515 }
517 if (! defined $fc_plugins[$type]->{$name}) {
518 ERROR ("Collectd::fc_call: Unknown "
519 . ($type == FC_MATCH ? "match" : "target")
520 . " \"$name\"");
521 return;
522 }
524 DEBUG ("Collectd::fc_call: "
525 . "type = \"$type\" (" . $fc_types{$type}
526 . "), name = \"$name\", cb_type = \"$cb_type\" ("
527 . $fc_cb_types{$cb_type} . ")");
529 {
530 lock %{$fc_plugins[$type]};
531 %proc = %{$fc_plugins[$type]->{$name}};
532 }
534 if (FC_CB_EXEC == $cb_type) {
535 $cb_name = $proc{$fc_exec_names{$type}};
536 }
537 elsif (FC_CB_CREATE == $cb_type) {
538 if (defined $proc{'create'}) {
539 $cb_name = $proc{'create'};
540 }
541 else {
542 return 1;
543 }
544 }
545 elsif (FC_CB_DESTROY == $cb_type) {
546 if (defined $proc{'destroy'}) {
547 $cb_name = $proc{'destroy'};
548 }
549 else {
550 return 1;
551 }
552 }
554 $status = call_by_name (@_);
556 if ($status < 0) {
557 my $err = undef;
559 if ($@) {
560 $err = $@;
561 }
562 else {
563 $err = "callback returned false";
564 }
566 ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
567 return;
568 }
569 return $status;
570 }
572 sub fc_register {
573 my $type = shift;
574 my $name = shift;
575 my $proc = shift;
577 my %fc : shared;
579 DEBUG ("Collectd::fc_register: "
580 . "type = \"$type\" (" . $fc_types{$type}
581 . "), name = \"$name\", proc = \"$proc\"");
583 if (! ((defined $type) && (defined $name) && (defined $proc))) {
584 ERROR ("Usage: Collectd::fc_register(type, name, proc)");
585 return;
586 }
588 if (! defined $fc_plugins[$type]) {
589 ERROR ("Collectd::fc_register: Invalid type \"$type\"");
590 return;
591 }
593 if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
594 || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
595 ERROR ("Collectd::fc_register: Invalid proc.");
596 return;
597 }
599 for my $p (qw( create destroy )) {
600 if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
601 ERROR ("Collectd::fc_register: Invalid proc.");
602 return;
603 }
604 }
606 %fc = %$proc;
608 foreach my $p (keys %fc) {
609 my $pkg = scalar caller;
611 if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
612 next;
613 }
615 if ($fc{$p} !~ m/^$pkg\:\:/) {
616 $fc{$p} = $pkg . "::" . $fc{$p};
617 }
618 }
620 lock %{$fc_plugins[$type]};
621 if (defined $fc_plugins[$type]->{$name}) {
622 WARNING ("Collectd::fc_register: Overwriting previous "
623 . "definition of match \"$name\".");
624 }
626 if (! _fc_register ($type, $name)) {
627 ERROR ("Collectd::fc_register: Failed to register \"$name\".");
628 return;
629 }
631 $fc_plugins[$type]->{$name} = \%fc;
632 return 1;
633 }
635 sub _plugin_dispatch_config {
636 my $plugin = shift;
637 my $config = shift;
639 our $cb_name = undef;
641 if (! (defined ($plugin) && defined ($config))) {
642 return;
643 }
645 if (! defined $cf_callbacks{$plugin}) {
646 WARNING ("Found a configuration for the \"$plugin\" plugin, but "
647 . "the plugin isn't loaded or didn't register "
648 . "a configuration callback.");
649 return;
650 }
652 {
653 lock %cf_callbacks;
654 $cb_name = $cf_callbacks{$plugin};
655 }
656 call_by_name ($config);
657 }
659 1;
661 # vim: set sw=4 ts=4 tw=78 noexpandtab :