5a96dddd96fd3d4c6df25eb7e27dfa8b4563f1cc
1 # collectd - Collectd.pm
2 # Copyright (C) 2007-2009 Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 #
17 # Author:
18 # Sebastian Harl <sh at tokkee.org>
20 package Collectd;
22 use strict;
23 use warnings;
25 use Config;
27 use threads;
28 use threads::shared;
30 BEGIN {
31 if (! $Config{'useithreads'}) {
32 die "Perl does not support ithreads!";
33 }
34 }
36 require Exporter;
38 our @ISA = qw( Exporter );
40 our %EXPORT_TAGS = (
41 'plugin' => [ qw(
42 plugin_register
43 plugin_unregister
44 plugin_dispatch_values
45 plugin_write
46 plugin_flush
47 plugin_flush_one
48 plugin_flush_all
49 plugin_dispatch_notification
50 plugin_log
51 ) ],
52 'types' => [ qw(
53 TYPE_INIT
54 TYPE_READ
55 TYPE_WRITE
56 TYPE_SHUTDOWN
57 TYPE_LOG
58 TYPE_NOTIF
59 TYPE_FLUSH
60 TYPE_CONFIG
61 TYPE_DATASET
62 ) ],
63 'ds_types' => [ qw(
64 DS_TYPE_COUNTER
65 DS_TYPE_GAUGE
66 ) ],
67 'log' => [ qw(
68 ERROR
69 WARNING
70 NOTICE
71 INFO
72 DEBUG
73 LOG_ERR
74 LOG_WARNING
75 LOG_NOTICE
76 LOG_INFO
77 LOG_DEBUG
78 ) ],
79 'filter_chain' => [ qw(
80 fc_register
81 FC_MATCH_NO_MATCH
82 FC_MATCH_MATCHES
83 FC_TARGET_CONTINUE
84 FC_TARGET_STOP
85 FC_TARGET_RETURN
86 ) ],
87 'fc_types' => [ qw(
88 FC_MATCH
89 FC_TARGET
90 ) ],
91 'notif' => [ qw(
92 NOTIF_FAILURE
93 NOTIF_WARNING
94 NOTIF_OKAY
95 ) ],
96 'globals' => [ qw(
97 $hostname_g
98 $interval_g
99 ) ],
100 );
102 {
103 my %seen;
104 push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
105 foreach keys %EXPORT_TAGS;
106 }
108 # global variables
109 our $hostname_g;
110 our $interval_g;
112 Exporter::export_ok_tags ('all');
114 my @plugins : shared = ();
115 my @fc_plugins : shared = ();
116 my %cf_callbacks : shared = ();
118 my %types = (
119 TYPE_DATASET, "dataset",
120 TYPE_CONFIG, "config",
121 TYPE_INIT, "init",
122 TYPE_READ, "read",
123 TYPE_WRITE, "write",
124 TYPE_SHUTDOWN, "shutdown",
125 TYPE_LOG, "log",
126 TYPE_NOTIF, "notify",
127 TYPE_FLUSH, "flush"
128 );
130 my %fc_types = (
131 FC_MATCH, "match",
132 FC_TARGET, "target"
133 );
135 my %fc_exec_names = (
136 FC_MATCH, "match",
137 FC_TARGET, "invoke"
138 );
140 my %fc_cb_types = (
141 FC_CB_EXEC, "exec",
142 FC_CB_CREATE, "create",
143 FC_CB_DESTROY, "destroy"
144 );
146 foreach my $type (keys %types) {
147 $plugins[$type] = &share ({});
148 }
150 foreach my $type (keys %fc_types) {
151 $fc_plugins[$type] = &share ({});
152 }
154 sub _log {
155 my $caller = shift;
156 my $lvl = shift;
157 my $msg = shift;
159 if ("Collectd" eq $caller) {
160 $msg = "perl: $msg";
161 }
162 return plugin_log ($lvl, $msg);
163 }
165 sub ERROR { _log (scalar caller, LOG_ERR, shift); }
166 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
167 sub NOTICE { _log (scalar caller, LOG_NOTICE, shift); }
168 sub INFO { _log (scalar caller, LOG_INFO, shift); }
169 sub DEBUG { _log (scalar caller, LOG_DEBUG, shift); }
171 sub plugin_call_all {
172 my $type = shift;
174 my %plugins;
176 our $cb_name = undef;
178 if (! defined $type) {
179 return;
180 }
182 if (TYPE_LOG != $type) {
183 DEBUG ("Collectd::plugin_call: type = \"$type\" ("
184 . $types{$type} . "), args=\""
185 . join(', ', map { defined($_) ? $_ : '<undef>' } @_) . "\"");
186 }
188 if (! defined $plugins[$type]) {
189 ERROR ("Collectd::plugin_call: unknown type \"$type\"");
190 return;
191 }
193 {
194 lock %{$plugins[$type]};
195 %plugins = %{$plugins[$type]};
196 }
198 foreach my $plugin (keys %plugins) {
199 my $p = $plugins{$plugin};
201 my $status = 0;
203 if ($p->{'wait_left'} > 0) {
204 $p->{'wait_left'} -= $interval_g;
205 }
207 next if ($p->{'wait_left'} > 0);
209 $cb_name = $p->{'cb_name'};
210 $status = call_by_name (@_);
212 if (! $status) {
213 my $err = undef;
215 if ($@) {
216 $err = $@;
217 }
218 else {
219 $err = "callback returned false";
220 }
222 if (TYPE_LOG != $type) {
223 ERROR ("Execution of callback \"$cb_name\" failed: $err");
224 }
226 $status = 0;
227 }
229 if ($status) {
230 $p->{'wait_left'} = 0;
231 $p->{'wait_time'} = $interval_g;
232 }
233 elsif (TYPE_READ == $type) {
234 if ($p->{'wait_time'} < $interval_g) {
235 $p->{'wait_time'} = $interval_g;
236 }
238 $p->{'wait_left'} = $p->{'wait_time'};
239 $p->{'wait_time'} *= 2;
241 if ($p->{'wait_time'} > 86400) {
242 $p->{'wait_time'} = 86400;
243 }
245 WARNING ("${plugin}->read() failed with status $status. "
246 . "Will suspend it for $p->{'wait_left'} seconds.");
247 }
248 elsif (TYPE_INIT == $type) {
249 ERROR ("${plugin}->init() failed with status $status. "
250 . "Plugin will be disabled.");
252 foreach my $type (keys %types) {
253 plugin_unregister ($type, $plugin);
254 }
255 }
256 elsif (TYPE_LOG != $type) {
257 WARNING ("${plugin}->$types{$type}() failed with status $status.");
258 }
259 }
260 return 1;
261 }
263 # Collectd::plugin_register (type, name, data).
264 #
265 # type:
266 # init, read, write, shutdown, data set
267 #
268 # name:
269 # name of the plugin
270 #
271 # data:
272 # reference to the plugin's subroutine that does the work or the data set
273 # definition
274 sub plugin_register {
275 my $type = shift;
276 my $name = shift;
277 my $data = shift;
279 DEBUG ("Collectd::plugin_register: "
280 . "type = \"$type\" (" . $types{$type}
281 . "), name = \"$name\", data = \"$data\"");
283 if (! ((defined $type) && (defined $name) && (defined $data))) {
284 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
285 return;
286 }
288 if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
289 && (TYPE_CONFIG != $type)) {
290 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
291 return;
292 }
294 if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
295 return plugin_register_data_set ($name, $data);
296 }
297 elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
298 my $pkg = scalar caller;
300 if ($data !~ m/^$pkg\:\:/) {
301 $data = $pkg . "::" . $data;
302 }
304 lock %cf_callbacks;
305 $cf_callbacks{$name} = $data;
306 }
307 elsif ((TYPE_DATASET != $type) && (! ref $data)) {
308 my $pkg = scalar caller;
310 my %p : shared;
312 if ($data !~ m/^$pkg\:\:/) {
313 $data = $pkg . "::" . $data;
314 }
316 %p = (
317 wait_time => $interval_g,
318 wait_left => 0,
319 cb_name => $data,
320 );
322 lock %{$plugins[$type]};
323 $plugins[$type]->{$name} = \%p;
324 }
325 else {
326 ERROR ("Collectd::plugin_register: Invalid data.");
327 return;
328 }
329 return 1;
330 }
332 sub plugin_unregister {
333 my $type = shift;
334 my $name = shift;
336 DEBUG ("Collectd::plugin_unregister: type = \"$type\" ("
337 . $types{$type} . "), name = \"$name\"");
339 if (! ((defined $type) && (defined $name))) {
340 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
341 return;
342 }
344 if (TYPE_DATASET == $type) {
345 return plugin_unregister_data_set ($name);
346 }
347 elsif (TYPE_CONFIG == $type) {
348 lock %cf_callbacks;
349 delete $cf_callbacks{$name};
350 }
351 elsif (defined $plugins[$type]) {
352 lock %{$plugins[$type]};
353 delete $plugins[$type]->{$name};
354 }
355 else {
356 ERROR ("Collectd::plugin_unregister: Invalid type.");
357 return;
358 }
359 }
361 sub plugin_write {
362 my %args = @_;
364 my @plugins = ();
365 my @datasets = ();
366 my @valuelists = ();
368 if (! defined $args{'valuelists'}) {
369 ERROR ("Collectd::plugin_write: Missing 'valuelists' argument.");
370 return;
371 }
373 DEBUG ("Collectd::plugin_write:"
374 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
375 . (defined ($args{'datasets'}) ? " datasets = $args{'datasets'}" : "")
376 . " valueslists = $args{'valuelists'}");
378 if (defined ($args{'plugins'})) {
379 if ("ARRAY" eq ref ($args{'plugins'})) {
380 @plugins = @{$args{'plugins'}};
381 }
382 else {
383 @plugins = ($args{'plugins'});
384 }
385 }
386 else {
387 @plugins = (undef);
388 }
390 if ("ARRAY" eq ref ($args{'valuelists'})) {
391 @valuelists = @{$args{'valuelists'}};
392 }
393 else {
394 @valuelists = ($args{'valuelists'});
395 }
397 if (defined ($args{'datasets'})) {
398 if ("ARRAY" eq ref ($args{'datasets'})) {
399 @datasets = @{$args{'datasets'}};
400 }
401 else {
402 @datasets = ($args{'datasets'});
403 }
404 }
405 else {
406 @datasets = (undef) x scalar (@valuelists);
407 }
409 if ($#datasets != $#valuelists) {
410 ERROR ("Collectd::plugin_write: Invalid number of datasets.");
411 return;
412 }
414 foreach my $plugin (@plugins) {
415 for (my $i = 0; $i < scalar (@valuelists); ++$i) {
416 _plugin_write ($plugin, $datasets[$i], $valuelists[$i]);
417 }
418 }
419 }
421 sub plugin_flush {
422 my %args = @_;
424 my $timeout = -1;
425 my @plugins = ();
426 my @ids = ();
428 DEBUG ("Collectd::plugin_flush:"
429 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
430 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
431 . (defined ($args{'identifiers'})
432 ? " identifiers = $args{'identifiers'}" : ""));
434 if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
435 $timeout = $args{'timeout'};
436 }
438 if (defined ($args{'plugins'})) {
439 if ("ARRAY" eq ref ($args{'plugins'})) {
440 @plugins = @{$args{'plugins'}};
441 }
442 else {
443 @plugins = ($args{'plugins'});
444 }
445 }
446 else {
447 @plugins = (undef);
448 }
450 if (defined ($args{'identifiers'})) {
451 if ("ARRAY" eq ref ($args{'identifiers'})) {
452 @ids = @{$args{'identifiers'}};
453 }
454 else {
455 @ids = ($args{'identifiers'});
456 }
457 }
458 else {
459 @ids = (undef);
460 }
462 foreach my $plugin (@plugins) {
463 foreach my $id (@ids) {
464 _plugin_flush($plugin, $timeout, $id);
465 }
466 }
467 }
469 sub plugin_flush_one {
470 my $timeout = shift;
471 my $name = shift;
473 WARNING ("Collectd::plugin_flush_one is deprecated - "
474 . "use Collectd::plugin_flush instead.");
476 if (! (defined ($timeout) && defined ($name))) {
477 ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
478 return;
479 }
481 plugin_flush (plugins => $name, timeout => $timeout);
482 }
484 sub plugin_flush_all {
485 my $timeout = shift;
487 WARNING ("Collectd::plugin_flush_all is deprecated - "
488 . "use Collectd::plugin_flush instead.");
490 if (! defined ($timeout)) {
491 ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
492 return;
493 }
495 plugin_flush (timeout => $timeout);
496 }
498 sub fc_call {
499 my $type = shift;
500 my $name = shift;
501 my $cb_type = shift;
503 my %proc;
505 our $cb_name = undef;
506 my $status;
508 if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
509 ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
510 return;
511 }
513 if (! defined $fc_plugins[$type]) {
514 ERROR ("Collectd::fc_call: Invalid type \"$type\"");
515 return;
516 }
518 if (! defined $fc_plugins[$type]->{$name}) {
519 ERROR ("Collectd::fc_call: Unknown "
520 . ($type == FC_MATCH ? "match" : "target")
521 . " \"$name\"");
522 return;
523 }
525 DEBUG ("Collectd::fc_call: "
526 . "type = \"$type\" (" . $fc_types{$type}
527 . "), name = \"$name\", cb_type = \"$cb_type\" ("
528 . $fc_cb_types{$cb_type} . ")");
530 {
531 lock %{$fc_plugins[$type]};
532 %proc = %{$fc_plugins[$type]->{$name}};
533 }
535 if (FC_CB_EXEC == $cb_type) {
536 $cb_name = $proc{$fc_exec_names{$type}};
537 }
538 elsif (FC_CB_CREATE == $cb_type) {
539 if (defined $proc{'create'}) {
540 $cb_name = $proc{'create'};
541 }
542 else {
543 return 1;
544 }
545 }
546 elsif (FC_CB_DESTROY == $cb_type) {
547 if (defined $proc{'destroy'}) {
548 $cb_name = $proc{'destroy'};
549 }
550 else {
551 return 1;
552 }
553 }
555 $status = call_by_name (@_);
557 if ($status < 0) {
558 my $err = undef;
560 if ($@) {
561 $err = $@;
562 }
563 else {
564 $err = "callback returned false";
565 }
567 ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
568 return;
569 }
570 return $status;
571 }
573 sub fc_register {
574 my $type = shift;
575 my $name = shift;
576 my $proc = shift;
578 my %fc : shared;
580 DEBUG ("Collectd::fc_register: "
581 . "type = \"$type\" (" . $fc_types{$type}
582 . "), name = \"$name\", proc = \"$proc\"");
584 if (! ((defined $type) && (defined $name) && (defined $proc))) {
585 ERROR ("Usage: Collectd::fc_register(type, name, proc)");
586 return;
587 }
589 if (! defined $fc_plugins[$type]) {
590 ERROR ("Collectd::fc_register: Invalid type \"$type\"");
591 return;
592 }
594 if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
595 || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
596 ERROR ("Collectd::fc_register: Invalid proc.");
597 return;
598 }
600 for my $p (qw( create destroy )) {
601 if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
602 ERROR ("Collectd::fc_register: Invalid proc.");
603 return;
604 }
605 }
607 %fc = %$proc;
609 foreach my $p (keys %fc) {
610 my $pkg = scalar caller;
612 if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
613 next;
614 }
616 if ($fc{$p} !~ m/^$pkg\:\:/) {
617 $fc{$p} = $pkg . "::" . $fc{$p};
618 }
619 }
621 lock %{$fc_plugins[$type]};
622 if (defined $fc_plugins[$type]->{$name}) {
623 WARNING ("Collectd::fc_register: Overwriting previous "
624 . "definition of match \"$name\".");
625 }
627 if (! _fc_register ($type, $name)) {
628 ERROR ("Collectd::fc_register: Failed to register \"$name\".");
629 return;
630 }
632 $fc_plugins[$type]->{$name} = \%fc;
633 return 1;
634 }
636 sub _plugin_dispatch_config {
637 my $plugin = shift;
638 my $config = shift;
640 our $cb_name = undef;
642 if (! (defined ($plugin) && defined ($config))) {
643 return;
644 }
646 if (! defined $cf_callbacks{$plugin}) {
647 WARNING ("Found a configuration for the \"$plugin\" plugin, but "
648 . "the plugin isn't loaded or didn't register "
649 . "a configuration callback.");
650 return;
651 }
653 {
654 lock %cf_callbacks;
655 $cb_name = $cf_callbacks{$plugin};
656 }
657 call_by_name ($config);
658 }
660 1;
662 # vim: set sw=4 ts=4 tw=78 noexpandtab :