1 # collectd - Collectd.pm
2 # Copyright (C) 2007-2009 Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 #
17 # Author:
18 # Sebastian Harl <sh at tokkee.org>
20 package Collectd;
22 use strict;
23 use warnings;
25 use Config;
27 use threads;
28 use threads::shared;
30 BEGIN {
31 if (! $Config{'useithreads'}) {
32 die "Perl does not support ithreads!";
33 }
34 }
36 require Exporter;
38 our @ISA = qw( Exporter );
40 our %EXPORT_TAGS = (
41 'plugin' => [ qw(
42 plugin_register
43 plugin_unregister
44 plugin_dispatch_values
45 plugin_write
46 plugin_flush
47 plugin_flush_one
48 plugin_flush_all
49 plugin_dispatch_notification
50 plugin_log
51 ) ],
52 'types' => [ qw(
53 TYPE_INIT
54 TYPE_READ
55 TYPE_WRITE
56 TYPE_SHUTDOWN
57 TYPE_LOG
58 TYPE_NOTIF
59 TYPE_FLUSH
60 TYPE_CONFIG
61 TYPE_DATASET
62 ) ],
63 'ds_types' => [ qw(
64 DS_TYPE_COUNTER
65 DS_TYPE_GAUGE
66 ) ],
67 'log' => [ qw(
68 ERROR
69 WARNING
70 NOTICE
71 INFO
72 DEBUG
73 LOG_ERR
74 LOG_WARNING
75 LOG_NOTICE
76 LOG_INFO
77 LOG_DEBUG
78 ) ],
79 'filter_chain' => [ qw(
80 fc_register
81 FC_MATCH_NO_MATCH
82 FC_MATCH_MATCHES
83 FC_TARGET_CONTINUE
84 FC_TARGET_STOP
85 FC_TARGET_RETURN
86 ) ],
87 'fc_types' => [ qw(
88 FC_MATCH
89 FC_TARGET
90 ) ],
91 'notif' => [ qw(
92 NOTIF_FAILURE
93 NOTIF_WARNING
94 NOTIF_OKAY
95 ) ],
96 'globals' => [ qw(
97 $hostname_g
98 $interval_g
99 ) ],
100 );
102 {
103 my %seen;
104 push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
105 foreach keys %EXPORT_TAGS;
106 }
108 # global variables
109 our $hostname_g;
110 our $interval_g;
112 Exporter::export_ok_tags ('all');
114 my @plugins : shared = ();
115 my @fc_plugins : shared = ();
116 my %cf_callbacks : shared = ();
118 my %types = (
119 TYPE_INIT, "init",
120 TYPE_READ, "read",
121 TYPE_WRITE, "write",
122 TYPE_SHUTDOWN, "shutdown",
123 TYPE_LOG, "log",
124 TYPE_NOTIF, "notify",
125 TYPE_FLUSH, "flush"
126 );
128 my %fc_types = (
129 FC_MATCH, "match",
130 FC_TARGET, "target"
131 );
133 my %fc_exec_names = (
134 FC_MATCH, "match",
135 FC_TARGET, "invoke"
136 );
138 foreach my $type (keys %types) {
139 $plugins[$type] = &share ({});
140 }
142 foreach my $type (keys %fc_types) {
143 $fc_plugins[$type] = &share ({});
144 }
146 sub _log {
147 my $caller = shift;
148 my $lvl = shift;
149 my $msg = shift;
151 if ("Collectd" eq $caller) {
152 $msg = "perl: $msg";
153 }
154 return plugin_log ($lvl, $msg);
155 }
157 sub ERROR { _log (scalar caller, LOG_ERR, shift); }
158 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
159 sub NOTICE { _log (scalar caller, LOG_NOTICE, shift); }
160 sub INFO { _log (scalar caller, LOG_INFO, shift); }
161 sub DEBUG { _log (scalar caller, LOG_DEBUG, shift); }
163 sub plugin_call_all {
164 my $type = shift;
166 my %plugins;
168 our $cb_name = undef;
170 if (! defined $type) {
171 return;
172 }
174 if (TYPE_LOG != $type) {
175 DEBUG ("Collectd::plugin_call: type = \"$type\", args=\"@_\"");
176 }
178 if (! defined $plugins[$type]) {
179 ERROR ("Collectd::plugin_call: unknown type \"$type\"");
180 return;
181 }
183 {
184 lock %{$plugins[$type]};
185 %plugins = %{$plugins[$type]};
186 }
188 foreach my $plugin (keys %plugins) {
189 my $p = $plugins{$plugin};
191 my $status = 0;
193 if ($p->{'wait_left'} > 0) {
194 $p->{'wait_left'} -= $interval_g;
195 }
197 next if ($p->{'wait_left'} > 0);
199 $cb_name = $p->{'cb_name'};
200 $status = call_by_name (@_);
202 if (! $status) {
203 my $err = undef;
205 if ($@) {
206 $err = $@;
207 }
208 else {
209 $err = "callback returned false";
210 }
212 if (TYPE_LOG != $type) {
213 ERROR ("Execution of callback \"$cb_name\" failed: $err");
214 }
216 $status = 0;
217 }
219 if ($status) {
220 $p->{'wait_left'} = 0;
221 $p->{'wait_time'} = $interval_g;
222 }
223 elsif (TYPE_READ == $type) {
224 if ($p->{'wait_time'} < $interval_g) {
225 $p->{'wait_time'} = $interval_g;
226 }
228 $p->{'wait_left'} = $p->{'wait_time'};
229 $p->{'wait_time'} *= 2;
231 if ($p->{'wait_time'} > 86400) {
232 $p->{'wait_time'} = 86400;
233 }
235 WARNING ("${plugin}->read() failed with status $status. "
236 . "Will suspend it for $p->{'wait_left'} seconds.");
237 }
238 elsif (TYPE_INIT == $type) {
239 ERROR ("${plugin}->init() failed with status $status. "
240 . "Plugin will be disabled.");
242 foreach my $type (keys %types) {
243 plugin_unregister ($type, $plugin);
244 }
245 }
246 elsif (TYPE_LOG != $type) {
247 WARNING ("${plugin}->$types{$type}() failed with status $status.");
248 }
249 }
250 return 1;
251 }
253 # Collectd::plugin_register (type, name, data).
254 #
255 # type:
256 # init, read, write, shutdown, data set
257 #
258 # name:
259 # name of the plugin
260 #
261 # data:
262 # reference to the plugin's subroutine that does the work or the data set
263 # definition
264 sub plugin_register {
265 my $type = shift;
266 my $name = shift;
267 my $data = shift;
269 DEBUG ("Collectd::plugin_register: "
270 . "type = \"$type\", name = \"$name\", data = \"$data\"");
272 if (! ((defined $type) && (defined $name) && (defined $data))) {
273 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
274 return;
275 }
277 if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
278 && (TYPE_CONFIG != $type)) {
279 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
280 return;
281 }
283 if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
284 return plugin_register_data_set ($name, $data);
285 }
286 elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
287 my $pkg = scalar caller;
289 if ($data !~ m/^$pkg\:\:/) {
290 $data = $pkg . "::" . $data;
291 }
293 lock %cf_callbacks;
294 $cf_callbacks{$name} = $data;
295 }
296 elsif ((TYPE_DATASET != $type) && (! ref $data)) {
297 my $pkg = scalar caller;
299 my %p : shared;
301 if ($data !~ m/^$pkg\:\:/) {
302 $data = $pkg . "::" . $data;
303 }
305 %p = (
306 wait_time => $interval_g,
307 wait_left => 0,
308 cb_name => $data,
309 );
311 lock %{$plugins[$type]};
312 $plugins[$type]->{$name} = \%p;
313 }
314 else {
315 ERROR ("Collectd::plugin_register: Invalid data.");
316 return;
317 }
318 return 1;
319 }
321 sub plugin_unregister {
322 my $type = shift;
323 my $name = shift;
325 DEBUG ("Collectd::plugin_unregister: type = \"$type\", name = \"$name\"");
327 if (! ((defined $type) && (defined $name))) {
328 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
329 return;
330 }
332 if (TYPE_DATASET == $type) {
333 return plugin_unregister_data_set ($name);
334 }
335 elsif (TYPE_CONFIG == $type) {
336 lock %cf_callbacks;
337 delete $cf_callbacks{$name};
338 }
339 elsif (defined $plugins[$type]) {
340 lock %{$plugins[$type]};
341 delete $plugins[$type]->{$name};
342 }
343 else {
344 ERROR ("Collectd::plugin_unregister: Invalid type.");
345 return;
346 }
347 }
349 sub plugin_write {
350 my %args = @_;
352 my @plugins = ();
353 my @datasets = ();
354 my @valuelists = ();
356 if (! defined $args{'valuelists'}) {
357 ERROR ("Collectd::plugin_write: Missing 'valuelists' argument.");
358 return;
359 }
361 DEBUG ("Collectd::plugin_write:"
362 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
363 . (defined ($args{'datasets'}) ? " datasets = $args{'datasets'}" : "")
364 . " valueslists = $args{'valuelists'}");
366 if (defined ($args{'plugins'})) {
367 if ("ARRAY" eq ref ($args{'plugins'})) {
368 @plugins = @{$args{'plugins'}};
369 }
370 else {
371 @plugins = ($args{'plugins'});
372 }
373 }
374 else {
375 @plugins = (undef);
376 }
378 if ("ARRAY" eq ref ($args{'valuelists'})) {
379 @valuelists = @{$args{'valuelists'}};
380 }
381 else {
382 @valuelists = ($args{'valuelists'});
383 }
385 if (defined ($args{'datasets'})) {
386 if ("ARRAY" eq ref ($args{'datasets'})) {
387 @datasets = @{$args{'datasets'}};
388 }
389 else {
390 @datasets = ($args{'datasets'});
391 }
392 }
393 else {
394 @datasets = (undef) x scalar (@valuelists);
395 }
397 if ($#datasets != $#valuelists) {
398 ERROR ("Collectd::plugin_write: Invalid number of datasets.");
399 return;
400 }
402 foreach my $plugin (@plugins) {
403 for (my $i = 0; $i < scalar (@valuelists); ++$i) {
404 _plugin_write ($plugin, $datasets[$i], $valuelists[$i]);
405 }
406 }
407 }
409 sub plugin_flush {
410 my %args = @_;
412 my $timeout = -1;
413 my @plugins = ();
414 my @ids = ();
416 DEBUG ("Collectd::plugin_flush:"
417 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
418 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
419 . (defined ($args{'identifiers'})
420 ? " identifiers = $args{'identifiers'}" : ""));
422 if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
423 $timeout = $args{'timeout'};
424 }
426 if (defined ($args{'plugins'})) {
427 if ("ARRAY" eq ref ($args{'plugins'})) {
428 @plugins = @{$args{'plugins'}};
429 }
430 else {
431 @plugins = ($args{'plugins'});
432 }
433 }
434 else {
435 @plugins = (undef);
436 }
438 if (defined ($args{'identifiers'})) {
439 if ("ARRAY" eq ref ($args{'identifiers'})) {
440 @ids = @{$args{'identifiers'}};
441 }
442 else {
443 @ids = ($args{'identifiers'});
444 }
445 }
446 else {
447 @ids = (undef);
448 }
450 foreach my $plugin (@plugins) {
451 foreach my $id (@ids) {
452 _plugin_flush($plugin, $timeout, $id);
453 }
454 }
455 }
457 sub plugin_flush_one {
458 my $timeout = shift;
459 my $name = shift;
461 WARNING ("Collectd::plugin_flush_one is deprecated - "
462 . "use Collectd::plugin_flush instead.");
464 if (! (defined ($timeout) && defined ($name))) {
465 ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
466 return;
467 }
469 plugin_flush (plugins => $name, timeout => $timeout);
470 }
472 sub plugin_flush_all {
473 my $timeout = shift;
475 WARNING ("Collectd::plugin_flush_all is deprecated - "
476 . "use Collectd::plugin_flush instead.");
478 if (! defined ($timeout)) {
479 ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
480 return;
481 }
483 plugin_flush (timeout => $timeout);
484 }
486 sub fc_call {
487 my $type = shift;
488 my $name = shift;
489 my $cb_type = shift;
491 my %proc;
493 our $cb_name = undef;
494 my $status;
496 if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
497 ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
498 return;
499 }
501 if (! defined $fc_plugins[$type]) {
502 ERROR ("Collectd::fc_call: Invalid type \"$type\"");
503 return;
504 }
506 if (! defined $fc_plugins[$type]->{$name}) {
507 ERROR ("Collectd::fc_call: Unknown "
508 . ($type == FC_MATCH ? "match" : "target")
509 . " \"$name\"");
510 return;
511 }
513 DEBUG ("Collectd::fc_call: "
514 . "type = \"$type\", name = \"$name\", cb_type = \"$cb_type\"");
516 {
517 lock %{$fc_plugins[$type]};
518 %proc = %{$fc_plugins[$type]->{$name}};
519 }
521 if (FC_CB_EXEC == $cb_type) {
522 $cb_name = $proc{$fc_exec_names{$type}};
523 }
524 elsif (FC_CB_CREATE == $cb_type) {
525 if (defined $proc{'create'}) {
526 $cb_name = $proc{'create'};
527 }
528 else {
529 return 1;
530 }
531 }
532 elsif (FC_CB_DESTROY == $cb_type) {
533 if (defined $proc{'destroy'}) {
534 $cb_name = $proc{'destroy'};
535 }
536 else {
537 return 1;
538 }
539 }
541 $status = call_by_name (@_);
543 if ($status < 0) {
544 my $err = undef;
546 if ($@) {
547 $err = $@;
548 }
549 else {
550 $err = "callback returned false";
551 }
553 ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
554 return;
555 }
556 return $status;
557 }
559 sub fc_register {
560 my $type = shift;
561 my $name = shift;
562 my $proc = shift;
564 my %fc : shared;
566 DEBUG ("Collectd::fc_register: "
567 . "type = \"$type\", name = \"$name\", proc = \"$proc\"");
569 if (! ((defined $type) && (defined $name) && (defined $proc))) {
570 ERROR ("Usage: Collectd::fc_register(type, name, proc)");
571 return;
572 }
574 if (! defined $fc_plugins[$type]) {
575 ERROR ("Collectd::fc_register: Invalid type \"$type\"");
576 return;
577 }
579 if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
580 || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
581 ERROR ("Collectd::fc_register: Invalid proc.");
582 return;
583 }
585 for my $p (qw( create destroy )) {
586 if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
587 ERROR ("Collectd::fc_register: Invalid proc.");
588 return;
589 }
590 }
592 %fc = %$proc;
594 foreach my $p (keys %fc) {
595 my $pkg = scalar caller;
597 if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
598 next;
599 }
601 if ($fc{$p} !~ m/^$pkg\:\:/) {
602 $fc{$p} = $pkg . "::" . $fc{$p};
603 }
604 }
606 lock %{$fc_plugins[$type]};
607 if (defined $fc_plugins[$type]->{$name}) {
608 WARNING ("Collectd::fc_register: Overwriting previous "
609 . "definition of match \"$name\".");
610 }
612 if (! _fc_register ($type, $name)) {
613 ERROR ("Collectd::fc_register: Failed to register \"$name\".");
614 return;
615 }
617 $fc_plugins[$type]->{$name} = \%fc;
618 return 1;
619 }
621 sub _plugin_dispatch_config {
622 my $plugin = shift;
623 my $config = shift;
625 our $cb_name = undef;
627 if (! (defined ($plugin) && defined ($config))) {
628 return;
629 }
631 if (! defined $cf_callbacks{$plugin}) {
632 WARNING ("Found a configuration for the \"$plugin\" plugin, but "
633 . "the plugin isn't loaded or didn't register "
634 . "a configuration callback.");
635 return;
636 }
638 {
639 lock %cf_callbacks;
640 $cb_name = $cf_callbacks{$plugin};
641 }
642 call_by_name ($config);
643 }
645 1;
647 # vim: set sw=4 ts=4 tw=78 noexpandtab :