1 # collectd - Collectd.pm
2 # Copyright (C) 2007-2009 Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 #
17 # Author:
18 # Sebastian Harl <sh at tokkee.org>
20 package Collectd;
22 use strict;
23 use warnings;
25 use Config;
27 use threads;
28 use threads::shared;
30 BEGIN {
31 if (! $Config{'useithreads'}) {
32 die "Perl does not support ithreads!";
33 }
34 }
36 require Exporter;
38 our @ISA = qw( Exporter );
40 our %EXPORT_TAGS = (
41 'plugin' => [ qw(
42 plugin_register
43 plugin_unregister
44 plugin_dispatch_values
45 plugin_get_interval
46 plugin_write
47 plugin_flush
48 plugin_flush_one
49 plugin_flush_all
50 plugin_dispatch_notification
51 plugin_log
52 ) ],
53 'types' => [ qw(
54 TYPE_INIT
55 TYPE_READ
56 TYPE_WRITE
57 TYPE_SHUTDOWN
58 TYPE_LOG
59 TYPE_NOTIF
60 TYPE_FLUSH
61 TYPE_CONFIG
62 TYPE_DATASET
63 ) ],
64 'ds_types' => [ qw(
65 DS_TYPE_COUNTER
66 DS_TYPE_GAUGE
67 ) ],
68 'log' => [ qw(
69 ERROR
70 WARNING
71 NOTICE
72 INFO
73 DEBUG
74 LOG_ERR
75 LOG_WARNING
76 LOG_NOTICE
77 LOG_INFO
78 LOG_DEBUG
79 ) ],
80 'filter_chain' => [ qw(
81 fc_register
82 FC_MATCH_NO_MATCH
83 FC_MATCH_MATCHES
84 FC_TARGET_CONTINUE
85 FC_TARGET_STOP
86 FC_TARGET_RETURN
87 ) ],
88 'fc_types' => [ qw(
89 FC_MATCH
90 FC_TARGET
91 ) ],
92 'notif' => [ qw(
93 NOTIF_FAILURE
94 NOTIF_WARNING
95 NOTIF_OKAY
96 ) ],
97 'globals' => [ qw(
98 $hostname_g
99 $interval_g
100 ) ],
101 );
103 {
104 my %seen;
105 push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
106 foreach keys %EXPORT_TAGS;
107 }
109 # global variables
110 our $hostname_g;
111 our $interval_g;
113 Exporter::export_ok_tags ('all');
115 my @plugins : shared = ();
116 my @fc_plugins : shared = ();
117 my %cf_callbacks : shared = ();
119 my %types = (
120 TYPE_CONFIG, "config",
121 TYPE_INIT, "init",
122 TYPE_READ, "read",
123 TYPE_WRITE, "write",
124 TYPE_SHUTDOWN, "shutdown",
125 TYPE_LOG, "log",
126 TYPE_NOTIF, "notify",
127 TYPE_FLUSH, "flush"
128 );
130 my %fc_types = (
131 FC_MATCH, "match",
132 FC_TARGET, "target"
133 );
135 my %fc_exec_names = (
136 FC_MATCH, "match",
137 FC_TARGET, "invoke"
138 );
140 my %fc_cb_types = (
141 FC_CB_EXEC, "exec",
142 FC_CB_CREATE, "create",
143 FC_CB_DESTROY, "destroy"
144 );
146 foreach my $type (keys %types) {
147 $plugins[$type] = &share ({});
148 }
150 foreach my $type (keys %fc_types) {
151 $fc_plugins[$type] = &share ({});
152 }
154 sub _log {
155 my $caller = shift;
156 my $lvl = shift;
157 my $msg = shift;
159 if ("Collectd" eq $caller) {
160 $msg = "perl: $msg";
161 }
162 return plugin_log ($lvl, $msg);
163 }
165 sub ERROR { _log (scalar caller, LOG_ERR, shift); }
166 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
167 sub NOTICE { _log (scalar caller, LOG_NOTICE, shift); }
168 sub INFO { _log (scalar caller, LOG_INFO, shift); }
169 sub DEBUG { _log (scalar caller, LOG_DEBUG, shift); }
171 sub plugin_call_all {
172 my $type = shift;
174 my %plugins;
175 my $interval;
177 our $cb_name = undef;
179 if (! defined $type) {
180 return;
181 }
183 if (TYPE_LOG != $type) {
184 DEBUG ("Collectd::plugin_call: type = \"$type\" ("
185 . $types{$type} . "), args=\""
186 . join(', ', map { defined($_) ? $_ : '<undef>' } @_) . "\"");
187 }
189 if (! defined $plugins[$type]) {
190 ERROR ("Collectd::plugin_call: unknown type \"$type\"");
191 return;
192 }
194 {
195 lock %{$plugins[$type]};
196 %plugins = %{$plugins[$type]};
197 }
199 $interval = plugin_get_interval ();
201 foreach my $plugin (keys %plugins) {
202 my $p = $plugins{$plugin};
204 my $status = 0;
206 if ($p->{'wait_left'} > 0) {
207 $p->{'wait_left'} -= $interval;
208 }
210 next if ($p->{'wait_left'} > 0);
212 $cb_name = $p->{'cb_name'};
213 $status = call_by_name (@_);
215 if (! $status) {
216 my $err = undef;
218 if ($@) {
219 $err = $@;
220 }
221 else {
222 $err = "callback returned false";
223 }
225 if (TYPE_LOG != $type) {
226 ERROR ("Execution of callback \"$cb_name\" failed: $err");
227 }
229 $status = 0;
230 }
232 if ($status) {
233 $p->{'wait_left'} = 0;
234 $p->{'wait_time'} = $interval;
235 }
236 elsif (TYPE_READ == $type) {
237 if ($p->{'wait_time'} < $interval) {
238 $p->{'wait_time'} = $interval;
239 }
241 $p->{'wait_left'} = $p->{'wait_time'};
242 $p->{'wait_time'} *= 2;
244 if ($p->{'wait_time'} > 86400) {
245 $p->{'wait_time'} = 86400;
246 }
248 WARNING ("${plugin}->read() failed with status $status. "
249 . "Will suspend it for $p->{'wait_left'} seconds.");
250 }
251 elsif (TYPE_INIT == $type) {
252 ERROR ("${plugin}->init() failed with status $status. "
253 . "Plugin will be disabled.");
255 foreach my $type (keys %types) {
256 plugin_unregister ($type, $plugin);
257 }
258 }
259 elsif (TYPE_LOG != $type) {
260 WARNING ("${plugin}->$types{$type}() failed with status $status.");
261 }
262 }
263 return 1;
264 }
266 # Collectd::plugin_register (type, name, data).
267 #
268 # type:
269 # init, read, write, shutdown, data set
270 #
271 # name:
272 # name of the plugin
273 #
274 # data:
275 # reference to the plugin's subroutine that does the work or the data set
276 # definition
277 sub plugin_register {
278 my $type = shift;
279 my $name = shift;
280 my $data = shift;
282 DEBUG ("Collectd::plugin_register: "
283 . "type = \"$type\" (" . $types{$type}
284 . "), name = \"$name\", data = \"$data\"");
286 if (! ((defined $type) && (defined $name) && (defined $data))) {
287 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
288 return;
289 }
291 if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
292 && (TYPE_CONFIG != $type)) {
293 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
294 return;
295 }
297 if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
298 return plugin_register_data_set ($name, $data);
299 }
300 elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
301 my $pkg = scalar caller;
303 if ($data !~ m/^$pkg\:\:/) {
304 $data = $pkg . "::" . $data;
305 }
307 lock %cf_callbacks;
308 $cf_callbacks{$name} = $data;
309 }
310 elsif ((TYPE_DATASET != $type) && (! ref $data)) {
311 my $pkg = scalar caller;
313 my %p : shared;
315 if ($data !~ m/^$pkg\:\:/) {
316 $data = $pkg . "::" . $data;
317 }
319 %p = (
320 wait_time => plugin_get_interval (),
321 wait_left => 0,
322 cb_name => $data,
323 );
325 lock %{$plugins[$type]};
326 $plugins[$type]->{$name} = \%p;
327 }
328 else {
329 ERROR ("Collectd::plugin_register: Invalid data.");
330 return;
331 }
332 return 1;
333 }
335 sub plugin_unregister {
336 my $type = shift;
337 my $name = shift;
339 DEBUG ("Collectd::plugin_unregister: type = \"$type\" ("
340 . $types{$type} . "), name = \"$name\"");
342 if (! ((defined $type) && (defined $name))) {
343 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
344 return;
345 }
347 if (TYPE_DATASET == $type) {
348 return plugin_unregister_data_set ($name);
349 }
350 elsif (TYPE_CONFIG == $type) {
351 lock %cf_callbacks;
352 delete $cf_callbacks{$name};
353 }
354 elsif (defined $plugins[$type]) {
355 lock %{$plugins[$type]};
356 delete $plugins[$type]->{$name};
357 }
358 else {
359 ERROR ("Collectd::plugin_unregister: Invalid type.");
360 return;
361 }
362 }
364 sub plugin_write {
365 my %args = @_;
367 my @plugins = ();
368 my @datasets = ();
369 my @valuelists = ();
371 if (! defined $args{'valuelists'}) {
372 ERROR ("Collectd::plugin_write: Missing 'valuelists' argument.");
373 return;
374 }
376 DEBUG ("Collectd::plugin_write:"
377 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
378 . (defined ($args{'datasets'}) ? " datasets = $args{'datasets'}" : "")
379 . " valueslists = $args{'valuelists'}");
381 if (defined ($args{'plugins'})) {
382 if ("ARRAY" eq ref ($args{'plugins'})) {
383 @plugins = @{$args{'plugins'}};
384 }
385 else {
386 @plugins = ($args{'plugins'});
387 }
388 }
389 else {
390 @plugins = (undef);
391 }
393 if ("ARRAY" eq ref ($args{'valuelists'})) {
394 @valuelists = @{$args{'valuelists'}};
395 }
396 else {
397 @valuelists = ($args{'valuelists'});
398 }
400 if (defined ($args{'datasets'})) {
401 if ("ARRAY" eq ref ($args{'datasets'})) {
402 @datasets = @{$args{'datasets'}};
403 }
404 else {
405 @datasets = ($args{'datasets'});
406 }
407 }
408 else {
409 @datasets = (undef) x scalar (@valuelists);
410 }
412 if ($#datasets != $#valuelists) {
413 ERROR ("Collectd::plugin_write: Invalid number of datasets.");
414 return;
415 }
417 foreach my $plugin (@plugins) {
418 for (my $i = 0; $i < scalar (@valuelists); ++$i) {
419 _plugin_write ($plugin, $datasets[$i], $valuelists[$i]);
420 }
421 }
422 }
424 sub plugin_flush {
425 my %args = @_;
427 my $timeout = -1;
428 my @plugins = ();
429 my @ids = ();
431 DEBUG ("Collectd::plugin_flush:"
432 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
433 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
434 . (defined ($args{'identifiers'})
435 ? " identifiers = $args{'identifiers'}" : ""));
437 if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
438 $timeout = $args{'timeout'};
439 }
441 if (defined ($args{'plugins'})) {
442 if ("ARRAY" eq ref ($args{'plugins'})) {
443 @plugins = @{$args{'plugins'}};
444 }
445 else {
446 @plugins = ($args{'plugins'});
447 }
448 }
449 else {
450 @plugins = (undef);
451 }
453 if (defined ($args{'identifiers'})) {
454 if ("ARRAY" eq ref ($args{'identifiers'})) {
455 @ids = @{$args{'identifiers'}};
456 }
457 else {
458 @ids = ($args{'identifiers'});
459 }
460 }
461 else {
462 @ids = (undef);
463 }
465 foreach my $plugin (@plugins) {
466 foreach my $id (@ids) {
467 _plugin_flush($plugin, $timeout, $id);
468 }
469 }
470 }
472 sub fc_call {
473 my $type = shift;
474 my $name = shift;
475 my $cb_type = shift;
477 my %proc;
479 our $cb_name = undef;
480 my $status;
482 if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
483 ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
484 return;
485 }
487 if (! defined $fc_plugins[$type]) {
488 ERROR ("Collectd::fc_call: Invalid type \"$type\"");
489 return;
490 }
492 if (! defined $fc_plugins[$type]->{$name}) {
493 ERROR ("Collectd::fc_call: Unknown "
494 . ($type == FC_MATCH ? "match" : "target")
495 . " \"$name\"");
496 return;
497 }
499 DEBUG ("Collectd::fc_call: "
500 . "type = \"$type\" (" . $fc_types{$type}
501 . "), name = \"$name\", cb_type = \"$cb_type\" ("
502 . $fc_cb_types{$cb_type} . ")");
504 {
505 lock %{$fc_plugins[$type]};
506 %proc = %{$fc_plugins[$type]->{$name}};
507 }
509 if (FC_CB_EXEC == $cb_type) {
510 $cb_name = $proc{$fc_exec_names{$type}};
511 }
512 elsif (FC_CB_CREATE == $cb_type) {
513 if (defined $proc{'create'}) {
514 $cb_name = $proc{'create'};
515 }
516 else {
517 return 1;
518 }
519 }
520 elsif (FC_CB_DESTROY == $cb_type) {
521 if (defined $proc{'destroy'}) {
522 $cb_name = $proc{'destroy'};
523 }
524 else {
525 return 1;
526 }
527 }
529 $status = call_by_name (@_);
531 if ($status < 0) {
532 my $err = undef;
534 if ($@) {
535 $err = $@;
536 }
537 else {
538 $err = "callback returned false";
539 }
541 ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
542 return;
543 }
544 return $status;
545 }
547 sub fc_register {
548 my $type = shift;
549 my $name = shift;
550 my $proc = shift;
552 my %fc : shared;
554 DEBUG ("Collectd::fc_register: "
555 . "type = \"$type\" (" . $fc_types{$type}
556 . "), name = \"$name\", proc = \"$proc\"");
558 if (! ((defined $type) && (defined $name) && (defined $proc))) {
559 ERROR ("Usage: Collectd::fc_register(type, name, proc)");
560 return;
561 }
563 if (! defined $fc_plugins[$type]) {
564 ERROR ("Collectd::fc_register: Invalid type \"$type\"");
565 return;
566 }
568 if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
569 || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
570 ERROR ("Collectd::fc_register: Invalid proc.");
571 return;
572 }
574 for my $p (qw( create destroy )) {
575 if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
576 ERROR ("Collectd::fc_register: Invalid proc.");
577 return;
578 }
579 }
581 %fc = %$proc;
583 foreach my $p (keys %fc) {
584 my $pkg = scalar caller;
586 if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
587 next;
588 }
590 if ($fc{$p} !~ m/^$pkg\:\:/) {
591 $fc{$p} = $pkg . "::" . $fc{$p};
592 }
593 }
595 lock %{$fc_plugins[$type]};
596 if (defined $fc_plugins[$type]->{$name}) {
597 WARNING ("Collectd::fc_register: Overwriting previous "
598 . "definition of match \"$name\".");
599 }
601 if (! _fc_register ($type, $name)) {
602 ERROR ("Collectd::fc_register: Failed to register \"$name\".");
603 return;
604 }
606 $fc_plugins[$type]->{$name} = \%fc;
607 return 1;
608 }
610 sub _plugin_dispatch_config {
611 my $plugin = shift;
612 my $config = shift;
614 our $cb_name = undef;
616 if (! (defined ($plugin) && defined ($config))) {
617 return;
618 }
620 if (! defined $cf_callbacks{$plugin}) {
621 WARNING ("Found a configuration for the \"$plugin\" plugin, but "
622 . "the plugin isn't loaded or didn't register "
623 . "a configuration callback.");
624 return;
625 }
627 {
628 lock %cf_callbacks;
629 $cb_name = $cf_callbacks{$plugin};
630 }
631 call_by_name ($config);
632 }
634 1;
636 # vim: set sw=4 ts=4 tw=78 noexpandtab :