1 # collectd - Collectd.pm
2 # Copyright (C) 2007-2009 Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 #
17 # Author:
18 # Sebastian Harl <sh at tokkee.org>
20 package Collectd;
22 use strict;
23 use warnings;
25 use Config;
27 use threads;
28 use threads::shared;
30 BEGIN {
31 if (! $Config{'useithreads'}) {
32 die "Perl does not support ithreads!";
33 }
34 }
36 require Exporter;
38 our @ISA = qw( Exporter );
40 our %EXPORT_TAGS = (
41 'plugin' => [ qw(
42 plugin_register
43 plugin_unregister
44 plugin_dispatch_values
45 plugin_write
46 plugin_flush
47 plugin_flush_one
48 plugin_flush_all
49 plugin_dispatch_notification
50 plugin_log
51 ) ],
52 'types' => [ qw(
53 TYPE_INIT
54 TYPE_READ
55 TYPE_WRITE
56 TYPE_SHUTDOWN
57 TYPE_LOG
58 TYPE_NOTIF
59 TYPE_FLUSH
60 TYPE_CONFIG
61 TYPE_DATASET
62 ) ],
63 'ds_types' => [ qw(
64 DS_TYPE_COUNTER
65 DS_TYPE_GAUGE
66 ) ],
67 'log' => [ qw(
68 ERROR
69 WARNING
70 NOTICE
71 INFO
72 DEBUG
73 LOG_ERR
74 LOG_WARNING
75 LOG_NOTICE
76 LOG_INFO
77 LOG_DEBUG
78 ) ],
79 'filter_chain' => [ qw(
80 fc_register
81 FC_MATCH_NO_MATCH
82 FC_MATCH_MATCHES
83 FC_TARGET_CONTINUE
84 FC_TARGET_STOP
85 FC_TARGET_RETURN
86 ) ],
87 'fc_types' => [ qw(
88 FC_MATCH
89 FC_TARGET
90 ) ],
91 'notif' => [ qw(
92 NOTIF_FAILURE
93 NOTIF_WARNING
94 NOTIF_OKAY
95 ) ],
96 'globals' => [ qw(
97 $hostname_g
98 $interval_g
99 ) ],
100 );
102 {
103 my %seen;
104 push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
105 foreach keys %EXPORT_TAGS;
106 }
108 # global variables
109 our $hostname_g;
110 our $interval_g;
112 Exporter::export_ok_tags ('all');
114 my @plugins : shared = ();
115 my @fc_plugins : shared = ();
116 my %cf_callbacks : shared = ();
118 my %types = (
119 TYPE_INIT, "init",
120 TYPE_READ, "read",
121 TYPE_WRITE, "write",
122 TYPE_SHUTDOWN, "shutdown",
123 TYPE_LOG, "log",
124 TYPE_NOTIF, "notify",
125 TYPE_FLUSH, "flush"
126 );
128 my %fc_types = (
129 FC_MATCH, "match",
130 FC_TARGET, "target"
131 );
133 my %fc_exec_names = (
134 FC_MATCH, "match",
135 FC_TARGET, "invoke"
136 );
138 my %fc_cb_types = (
139 FC_CB_EXEC, "exec",
140 FC_CB_CREATE, "create",
141 FC_CB_DESTROY, "destroy"
142 );
144 foreach my $type (keys %types) {
145 $plugins[$type] = &share ({});
146 }
148 foreach my $type (keys %fc_types) {
149 $fc_plugins[$type] = &share ({});
150 }
152 sub _log {
153 my $caller = shift;
154 my $lvl = shift;
155 my $msg = shift;
157 if ("Collectd" eq $caller) {
158 $msg = "perl: $msg";
159 }
160 return plugin_log ($lvl, $msg);
161 }
163 sub ERROR { _log (scalar caller, LOG_ERR, shift); }
164 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
165 sub NOTICE { _log (scalar caller, LOG_NOTICE, shift); }
166 sub INFO { _log (scalar caller, LOG_INFO, shift); }
167 sub DEBUG { _log (scalar caller, LOG_DEBUG, shift); }
169 sub plugin_call_all {
170 my $type = shift;
172 my %plugins;
174 our $cb_name = undef;
176 if (! defined $type) {
177 return;
178 }
180 if (TYPE_LOG != $type) {
181 DEBUG ("Collectd::plugin_call: type = \"$type\" ("
182 . $types{$type} . "), args=\""
183 . join(', ', map { defined($_) ? $_ : '<undef>' } @_) . "\"");
184 }
186 if (! defined $plugins[$type]) {
187 ERROR ("Collectd::plugin_call: unknown type \"$type\"");
188 return;
189 }
191 {
192 lock %{$plugins[$type]};
193 %plugins = %{$plugins[$type]};
194 }
196 foreach my $plugin (keys %plugins) {
197 my $p = $plugins{$plugin};
199 my $status = 0;
201 if ($p->{'wait_left'} > 0) {
202 $p->{'wait_left'} -= $interval_g;
203 }
205 next if ($p->{'wait_left'} > 0);
207 $cb_name = $p->{'cb_name'};
208 $status = call_by_name (@_);
210 if (! $status) {
211 my $err = undef;
213 if ($@) {
214 $err = $@;
215 }
216 else {
217 $err = "callback returned false";
218 }
220 if (TYPE_LOG != $type) {
221 ERROR ("Execution of callback \"$cb_name\" failed: $err");
222 }
224 $status = 0;
225 }
227 if ($status) {
228 $p->{'wait_left'} = 0;
229 $p->{'wait_time'} = $interval_g;
230 }
231 elsif (TYPE_READ == $type) {
232 if ($p->{'wait_time'} < $interval_g) {
233 $p->{'wait_time'} = $interval_g;
234 }
236 $p->{'wait_left'} = $p->{'wait_time'};
237 $p->{'wait_time'} *= 2;
239 if ($p->{'wait_time'} > 86400) {
240 $p->{'wait_time'} = 86400;
241 }
243 WARNING ("${plugin}->read() failed with status $status. "
244 . "Will suspend it for $p->{'wait_left'} seconds.");
245 }
246 elsif (TYPE_INIT == $type) {
247 ERROR ("${plugin}->init() failed with status $status. "
248 . "Plugin will be disabled.");
250 foreach my $type (keys %types) {
251 plugin_unregister ($type, $plugin);
252 }
253 }
254 elsif (TYPE_LOG != $type) {
255 WARNING ("${plugin}->$types{$type}() failed with status $status.");
256 }
257 }
258 return 1;
259 }
261 # Collectd::plugin_register (type, name, data).
262 #
263 # type:
264 # init, read, write, shutdown, data set
265 #
266 # name:
267 # name of the plugin
268 #
269 # data:
270 # reference to the plugin's subroutine that does the work or the data set
271 # definition
272 sub plugin_register {
273 my $type = shift;
274 my $name = shift;
275 my $data = shift;
277 DEBUG ("Collectd::plugin_register: "
278 . "type = \"$type\" (" . $types{$type}
279 . "), name = \"$name\", data = \"$data\"");
281 if (! ((defined $type) && (defined $name) && (defined $data))) {
282 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
283 return;
284 }
286 if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
287 && (TYPE_CONFIG != $type)) {
288 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
289 return;
290 }
292 if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
293 return plugin_register_data_set ($name, $data);
294 }
295 elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
296 my $pkg = scalar caller;
298 if ($data !~ m/^$pkg\:\:/) {
299 $data = $pkg . "::" . $data;
300 }
302 lock %cf_callbacks;
303 $cf_callbacks{$name} = $data;
304 }
305 elsif ((TYPE_DATASET != $type) && (! ref $data)) {
306 my $pkg = scalar caller;
308 my %p : shared;
310 if ($data !~ m/^$pkg\:\:/) {
311 $data = $pkg . "::" . $data;
312 }
314 %p = (
315 wait_time => $interval_g,
316 wait_left => 0,
317 cb_name => $data,
318 );
320 lock %{$plugins[$type]};
321 $plugins[$type]->{$name} = \%p;
322 }
323 else {
324 ERROR ("Collectd::plugin_register: Invalid data.");
325 return;
326 }
327 return 1;
328 }
330 sub plugin_unregister {
331 my $type = shift;
332 my $name = shift;
334 DEBUG ("Collectd::plugin_unregister: type = \"$type\" ("
335 . $types{$type} . "), name = \"$name\"");
337 if (! ((defined $type) && (defined $name))) {
338 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
339 return;
340 }
342 if (TYPE_DATASET == $type) {
343 return plugin_unregister_data_set ($name);
344 }
345 elsif (TYPE_CONFIG == $type) {
346 lock %cf_callbacks;
347 delete $cf_callbacks{$name};
348 }
349 elsif (defined $plugins[$type]) {
350 lock %{$plugins[$type]};
351 delete $plugins[$type]->{$name};
352 }
353 else {
354 ERROR ("Collectd::plugin_unregister: Invalid type.");
355 return;
356 }
357 }
359 sub plugin_write {
360 my %args = @_;
362 my @plugins = ();
363 my @datasets = ();
364 my @valuelists = ();
366 if (! defined $args{'valuelists'}) {
367 ERROR ("Collectd::plugin_write: Missing 'valuelists' argument.");
368 return;
369 }
371 DEBUG ("Collectd::plugin_write:"
372 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
373 . (defined ($args{'datasets'}) ? " datasets = $args{'datasets'}" : "")
374 . " valueslists = $args{'valuelists'}");
376 if (defined ($args{'plugins'})) {
377 if ("ARRAY" eq ref ($args{'plugins'})) {
378 @plugins = @{$args{'plugins'}};
379 }
380 else {
381 @plugins = ($args{'plugins'});
382 }
383 }
384 else {
385 @plugins = (undef);
386 }
388 if ("ARRAY" eq ref ($args{'valuelists'})) {
389 @valuelists = @{$args{'valuelists'}};
390 }
391 else {
392 @valuelists = ($args{'valuelists'});
393 }
395 if (defined ($args{'datasets'})) {
396 if ("ARRAY" eq ref ($args{'datasets'})) {
397 @datasets = @{$args{'datasets'}};
398 }
399 else {
400 @datasets = ($args{'datasets'});
401 }
402 }
403 else {
404 @datasets = (undef) x scalar (@valuelists);
405 }
407 if ($#datasets != $#valuelists) {
408 ERROR ("Collectd::plugin_write: Invalid number of datasets.");
409 return;
410 }
412 foreach my $plugin (@plugins) {
413 for (my $i = 0; $i < scalar (@valuelists); ++$i) {
414 _plugin_write ($plugin, $datasets[$i], $valuelists[$i]);
415 }
416 }
417 }
419 sub plugin_flush {
420 my %args = @_;
422 my $timeout = -1;
423 my @plugins = ();
424 my @ids = ();
426 DEBUG ("Collectd::plugin_flush:"
427 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
428 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
429 . (defined ($args{'identifiers'})
430 ? " identifiers = $args{'identifiers'}" : ""));
432 if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
433 $timeout = $args{'timeout'};
434 }
436 if (defined ($args{'plugins'})) {
437 if ("ARRAY" eq ref ($args{'plugins'})) {
438 @plugins = @{$args{'plugins'}};
439 }
440 else {
441 @plugins = ($args{'plugins'});
442 }
443 }
444 else {
445 @plugins = (undef);
446 }
448 if (defined ($args{'identifiers'})) {
449 if ("ARRAY" eq ref ($args{'identifiers'})) {
450 @ids = @{$args{'identifiers'}};
451 }
452 else {
453 @ids = ($args{'identifiers'});
454 }
455 }
456 else {
457 @ids = (undef);
458 }
460 foreach my $plugin (@plugins) {
461 foreach my $id (@ids) {
462 _plugin_flush($plugin, $timeout, $id);
463 }
464 }
465 }
467 sub plugin_flush_one {
468 my $timeout = shift;
469 my $name = shift;
471 WARNING ("Collectd::plugin_flush_one is deprecated - "
472 . "use Collectd::plugin_flush instead.");
474 if (! (defined ($timeout) && defined ($name))) {
475 ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
476 return;
477 }
479 plugin_flush (plugins => $name, timeout => $timeout);
480 }
482 sub plugin_flush_all {
483 my $timeout = shift;
485 WARNING ("Collectd::plugin_flush_all is deprecated - "
486 . "use Collectd::plugin_flush instead.");
488 if (! defined ($timeout)) {
489 ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
490 return;
491 }
493 plugin_flush (timeout => $timeout);
494 }
496 sub fc_call {
497 my $type = shift;
498 my $name = shift;
499 my $cb_type = shift;
501 my %proc;
503 our $cb_name = undef;
504 my $status;
506 if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
507 ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
508 return;
509 }
511 if (! defined $fc_plugins[$type]) {
512 ERROR ("Collectd::fc_call: Invalid type \"$type\"");
513 return;
514 }
516 if (! defined $fc_plugins[$type]->{$name}) {
517 ERROR ("Collectd::fc_call: Unknown "
518 . ($type == FC_MATCH ? "match" : "target")
519 . " \"$name\"");
520 return;
521 }
523 DEBUG ("Collectd::fc_call: "
524 . "type = \"$type\" (" . $fc_types{$type}
525 . "), name = \"$name\", cb_type = \"$cb_type\" ("
526 . $fc_cb_types{$cb_type} . ")");
528 {
529 lock %{$fc_plugins[$type]};
530 %proc = %{$fc_plugins[$type]->{$name}};
531 }
533 if (FC_CB_EXEC == $cb_type) {
534 $cb_name = $proc{$fc_exec_names{$type}};
535 }
536 elsif (FC_CB_CREATE == $cb_type) {
537 if (defined $proc{'create'}) {
538 $cb_name = $proc{'create'};
539 }
540 else {
541 return 1;
542 }
543 }
544 elsif (FC_CB_DESTROY == $cb_type) {
545 if (defined $proc{'destroy'}) {
546 $cb_name = $proc{'destroy'};
547 }
548 else {
549 return 1;
550 }
551 }
553 $status = call_by_name (@_);
555 if ($status < 0) {
556 my $err = undef;
558 if ($@) {
559 $err = $@;
560 }
561 else {
562 $err = "callback returned false";
563 }
565 ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
566 return;
567 }
568 return $status;
569 }
571 sub fc_register {
572 my $type = shift;
573 my $name = shift;
574 my $proc = shift;
576 my %fc : shared;
578 DEBUG ("Collectd::fc_register: "
579 . "type = \"$type\" (" . $fc_types{$type}
580 . "), name = \"$name\", proc = \"$proc\"");
582 if (! ((defined $type) && (defined $name) && (defined $proc))) {
583 ERROR ("Usage: Collectd::fc_register(type, name, proc)");
584 return;
585 }
587 if (! defined $fc_plugins[$type]) {
588 ERROR ("Collectd::fc_register: Invalid type \"$type\"");
589 return;
590 }
592 if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
593 || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
594 ERROR ("Collectd::fc_register: Invalid proc.");
595 return;
596 }
598 for my $p (qw( create destroy )) {
599 if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
600 ERROR ("Collectd::fc_register: Invalid proc.");
601 return;
602 }
603 }
605 %fc = %$proc;
607 foreach my $p (keys %fc) {
608 my $pkg = scalar caller;
610 if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
611 next;
612 }
614 if ($fc{$p} !~ m/^$pkg\:\:/) {
615 $fc{$p} = $pkg . "::" . $fc{$p};
616 }
617 }
619 lock %{$fc_plugins[$type]};
620 if (defined $fc_plugins[$type]->{$name}) {
621 WARNING ("Collectd::fc_register: Overwriting previous "
622 . "definition of match \"$name\".");
623 }
625 if (! _fc_register ($type, $name)) {
626 ERROR ("Collectd::fc_register: Failed to register \"$name\".");
627 return;
628 }
630 $fc_plugins[$type]->{$name} = \%fc;
631 return 1;
632 }
634 sub _plugin_dispatch_config {
635 my $plugin = shift;
636 my $config = shift;
638 our $cb_name = undef;
640 if (! (defined ($plugin) && defined ($config))) {
641 return;
642 }
644 if (! defined $cf_callbacks{$plugin}) {
645 WARNING ("Found a configuration for the \"$plugin\" plugin, but "
646 . "the plugin isn't loaded or didn't register "
647 . "a configuration callback.");
648 return;
649 }
651 {
652 lock %cf_callbacks;
653 $cb_name = $cf_callbacks{$plugin};
654 }
655 call_by_name ($config);
656 }
658 1;
660 # vim: set sw=4 ts=4 tw=78 noexpandtab :