1 # collectd - Collectd.pm
2 # Copyright (C) 2007-2009 Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 #
17 # Author:
18 # Sebastian Harl <sh at tokkee.org>
20 package Collectd;
22 use strict;
23 use warnings;
25 use Config;
27 use threads;
28 use threads::shared;
30 BEGIN {
31 if (! $Config{'useithreads'}) {
32 die "Perl does not support ithreads!";
33 }
34 }
36 require Exporter;
38 our @ISA = qw( Exporter );
40 our %EXPORT_TAGS = (
41 'plugin' => [ qw(
42 plugin_register
43 plugin_unregister
44 plugin_dispatch_values
45 plugin_get_interval
46 plugin_write
47 plugin_flush
48 plugin_flush_one
49 plugin_flush_all
50 plugin_dispatch_notification
51 plugin_log
52 ) ],
53 'types' => [ qw(
54 TYPE_INIT
55 TYPE_READ
56 TYPE_WRITE
57 TYPE_SHUTDOWN
58 TYPE_LOG
59 TYPE_NOTIF
60 TYPE_FLUSH
61 TYPE_CONFIG
62 TYPE_DATASET
63 ) ],
64 'ds_types' => [ qw(
65 DS_TYPE_COUNTER
66 DS_TYPE_GAUGE
67 ) ],
68 'log' => [ qw(
69 ERROR
70 WARNING
71 NOTICE
72 INFO
73 DEBUG
74 LOG_ERR
75 LOG_WARNING
76 LOG_NOTICE
77 LOG_INFO
78 LOG_DEBUG
79 ) ],
80 'filter_chain' => [ qw(
81 fc_register
82 FC_MATCH_NO_MATCH
83 FC_MATCH_MATCHES
84 FC_TARGET_CONTINUE
85 FC_TARGET_STOP
86 FC_TARGET_RETURN
87 ) ],
88 'fc_types' => [ qw(
89 FC_MATCH
90 FC_TARGET
91 ) ],
92 'notif' => [ qw(
93 NOTIF_FAILURE
94 NOTIF_WARNING
95 NOTIF_OKAY
96 ) ],
97 'globals' => [ qw(
98 $hostname_g
99 $interval_g
100 ) ],
101 );
103 {
104 my %seen;
105 push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
106 foreach keys %EXPORT_TAGS;
107 }
109 # global variables
110 our $hostname_g;
111 our $interval_g;
113 Exporter::export_ok_tags ('all');
115 my @plugins : shared = ();
116 my @fc_plugins : shared = ();
117 my %cf_callbacks : shared = ();
119 my %types = (
120 TYPE_CONFIG, "config",
121 TYPE_INIT, "init",
122 TYPE_READ, "read",
123 TYPE_WRITE, "write",
124 TYPE_SHUTDOWN, "shutdown",
125 TYPE_LOG, "log",
126 TYPE_NOTIF, "notify",
127 TYPE_FLUSH, "flush"
128 );
130 my %fc_types = (
131 FC_MATCH, "match",
132 FC_TARGET, "target"
133 );
135 my %fc_exec_names = (
136 FC_MATCH, "match",
137 FC_TARGET, "invoke"
138 );
140 my %fc_cb_types = (
141 FC_CB_EXEC, "exec",
142 FC_CB_CREATE, "create",
143 FC_CB_DESTROY, "destroy"
144 );
146 foreach my $type (keys %types) {
147 $plugins[$type] = &share ({});
148 }
150 foreach my $type (keys %fc_types) {
151 $fc_plugins[$type] = &share ({});
152 }
154 sub _log {
155 my $caller = shift;
156 my $lvl = shift;
157 my $msg = shift;
159 if ("Collectd" eq $caller) {
160 $msg = "perl: $msg";
161 }
162 return plugin_log ($lvl, $msg);
163 }
165 sub ERROR { _log (scalar caller, LOG_ERR, shift); }
166 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
167 sub NOTICE { _log (scalar caller, LOG_NOTICE, shift); }
168 sub INFO { _log (scalar caller, LOG_INFO, shift); }
169 sub DEBUG { _log (scalar caller, LOG_DEBUG, shift); }
171 sub plugin_call_all {
172 my $type = shift;
174 my %plugins;
176 our $cb_name = undef;
178 if (! defined $type) {
179 return;
180 }
182 if (TYPE_LOG != $type) {
183 DEBUG ("Collectd::plugin_call_all: type = \"$type\" ("
184 . $types{$type} . "), args=\""
185 . join(', ', map { defined($_) ? $_ : '<undef>' } @_) . "\"");
186 }
188 if (! defined $plugins[$type]) {
189 ERROR ("Collectd::plugin_call_all: unknown type \"$type\"");
190 return;
191 }
193 {
194 lock %{$plugins[$type]};
195 %plugins = %{$plugins[$type]};
196 }
198 foreach my $plugin (keys %plugins) {
199 $cb_name = $plugins{$plugin};
200 my $status = call_by_name (@_);
202 if (! $status) {
203 my $err = undef;
205 if ($@) {
206 $err = $@;
207 }
208 else {
209 $err = "callback returned false";
210 }
212 if (TYPE_LOG != $type) {
213 ERROR ("Execution of callback \"$cb_name\" failed: $err");
214 }
216 $status = 0;
217 }
219 if ($status) {
220 #NOOP
221 }
222 elsif (TYPE_INIT == $type) {
223 ERROR ("${plugin}->init() failed with status $status. "
224 . "Plugin will be disabled.");
226 foreach my $type (keys %types) {
227 plugin_unregister ($type, $plugin);
228 }
229 }
230 elsif (TYPE_LOG != $type) {
231 WARNING ("${plugin}->$types{$type}() failed with status $status.");
232 }
233 }
234 return 1;
235 }
237 # Collectd::plugin_register (type, name, data).
238 #
239 # type:
240 # init, read, write, shutdown, data set
241 #
242 # name:
243 # name of the plugin
244 #
245 # data:
246 # reference to the plugin's subroutine that does the work or the data set
247 # definition
248 sub plugin_register {
249 my $type = shift;
250 my $name = shift;
251 my $data = shift;
253 DEBUG ("Collectd::plugin_register: "
254 . "type = \"$type\" (" . $types{$type}
255 . "), name = \"$name\", data = \"$data\"");
257 if (! ((defined $type) && (defined $name) && (defined $data))) {
258 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
259 return;
260 }
262 if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
263 && (TYPE_CONFIG != $type)) {
264 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
265 return;
266 }
268 if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
269 return plugin_register_data_set ($name, $data);
270 }
271 elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
272 my $pkg = scalar caller;
274 if ($data !~ m/^$pkg\:\:/) {
275 $data = $pkg . "::" . $data;
276 }
278 lock %cf_callbacks;
279 $cf_callbacks{$name} = $data;
280 }
281 elsif ((TYPE_DATASET != $type) && (! ref $data)) {
282 my $pkg = scalar caller;
283 if ($data !~ m/^$pkg\:\:/) {
284 $data = $pkg . "::" . $data;
285 }
286 if (TYPE_READ == $type) {
287 return plugin_register_read($name, $data);
288 }
289 if (TYPE_WRITE == $type) {
290 return plugin_register_write($name, $data);
291 }
292 if (TYPE_LOG == $type) {
293 return plugin_register_log($name, $data);
294 }
295 if (TYPE_NOTIF == $type) {
296 return plugin_register_notification($name, $data);
297 }
298 if (TYPE_FLUSH == $type) {
299 #For collectd-5.6 only
300 lock %{$plugins[$type]};
301 $plugins[$type]->{$name} = $data;
302 return plugin_register_flush($name, $data);
303 }
304 lock %{$plugins[$type]};
305 $plugins[$type]->{$name} = $data;
306 }
307 else {
308 ERROR ("Collectd::plugin_register: Invalid data.");
309 return;
310 }
311 return 1;
312 }
314 sub plugin_unregister {
315 my $type = shift;
316 my $name = shift;
318 DEBUG ("Collectd::plugin_unregister: type = \"$type\" ("
319 . $types{$type} . "), name = \"$name\"");
321 if (! ((defined $type) && (defined $name))) {
322 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
323 return;
324 }
326 if (TYPE_DATASET == $type) {
327 return plugin_unregister_data_set ($name);
328 }
329 elsif (TYPE_CONFIG == $type) {
330 lock %cf_callbacks;
331 delete $cf_callbacks{$name};
332 }
333 elsif (TYPE_READ == $type) {
334 return plugin_unregister_read ($name);
335 }
336 elsif (TYPE_WRITE == $type) {
337 return plugin_unregister_write($name);
338 }
339 elsif (TYPE_LOG == $type) {
340 return plugin_unregister_log ($name);
341 }
342 elsif (TYPE_NOTIF == $type) {
343 return plugin_unregister_notification($name);
344 }
345 elsif (TYPE_FLUSH == $type) {
346 return plugin_unregister_flush($name);
347 }
348 elsif (defined $plugins[$type]) {
349 lock %{$plugins[$type]};
350 delete $plugins[$type]->{$name};
351 }
352 else {
353 ERROR ("Collectd::plugin_unregister: Invalid type.");
354 return;
355 }
356 }
358 sub plugin_write {
359 my %args = @_;
361 my @plugins = ();
362 my @datasets = ();
363 my @valuelists = ();
365 if (! defined $args{'valuelists'}) {
366 ERROR ("Collectd::plugin_write: Missing 'valuelists' argument.");
367 return;
368 }
370 DEBUG ("Collectd::plugin_write:"
371 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
372 . (defined ($args{'datasets'}) ? " datasets = $args{'datasets'}" : "")
373 . " valueslists = $args{'valuelists'}");
375 if (defined ($args{'plugins'})) {
376 if ("ARRAY" eq ref ($args{'plugins'})) {
377 @plugins = @{$args{'plugins'}};
378 }
379 else {
380 @plugins = ($args{'plugins'});
381 }
382 }
383 else {
384 @plugins = (undef);
385 }
387 if ("ARRAY" eq ref ($args{'valuelists'})) {
388 @valuelists = @{$args{'valuelists'}};
389 }
390 else {
391 @valuelists = ($args{'valuelists'});
392 }
394 if (defined ($args{'datasets'})) {
395 if ("ARRAY" eq ref ($args{'datasets'})) {
396 @datasets = @{$args{'datasets'}};
397 }
398 else {
399 @datasets = ($args{'datasets'});
400 }
401 }
402 else {
403 @datasets = (undef) x scalar (@valuelists);
404 }
406 if ($#datasets != $#valuelists) {
407 ERROR ("Collectd::plugin_write: Invalid number of datasets.");
408 return;
409 }
411 foreach my $plugin (@plugins) {
412 for (my $i = 0; $i < scalar (@valuelists); ++$i) {
413 _plugin_write ($plugin, $datasets[$i], $valuelists[$i]);
414 }
415 }
416 }
418 sub plugin_flush {
419 my %args = @_;
421 my $timeout = -1;
422 my @plugins = ();
423 my @ids = ();
425 DEBUG ("Collectd::plugin_flush:"
426 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
427 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
428 . (defined ($args{'identifiers'})
429 ? " identifiers = $args{'identifiers'}" : ""));
431 if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
432 $timeout = $args{'timeout'};
433 }
435 if (defined ($args{'plugins'})) {
436 if ("ARRAY" eq ref ($args{'plugins'})) {
437 @plugins = @{$args{'plugins'}};
438 }
439 else {
440 @plugins = ($args{'plugins'});
441 }
442 }
443 else {
444 @plugins = (undef);
445 }
447 if (defined ($args{'identifiers'})) {
448 if ("ARRAY" eq ref ($args{'identifiers'})) {
449 @ids = @{$args{'identifiers'}};
450 }
451 else {
452 @ids = ($args{'identifiers'});
453 }
454 }
455 else {
456 @ids = (undef);
457 }
459 foreach my $plugin (@plugins) {
460 foreach my $id (@ids) {
461 _plugin_flush($plugin, $timeout, $id);
462 }
463 }
464 }
466 sub fc_call {
467 my $type = shift;
468 my $name = shift;
469 my $cb_type = shift;
471 my %proc;
473 our $cb_name = undef;
474 my $status;
476 if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
477 ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
478 return;
479 }
481 if (! defined $fc_plugins[$type]) {
482 ERROR ("Collectd::fc_call: Invalid type \"$type\"");
483 return;
484 }
486 if (! defined $fc_plugins[$type]->{$name}) {
487 ERROR ("Collectd::fc_call: Unknown "
488 . ($type == FC_MATCH ? "match" : "target")
489 . " \"$name\"");
490 return;
491 }
493 DEBUG ("Collectd::fc_call: "
494 . "type = \"$type\" (" . $fc_types{$type}
495 . "), name = \"$name\", cb_type = \"$cb_type\" ("
496 . $fc_cb_types{$cb_type} . ")");
498 {
499 lock %{$fc_plugins[$type]};
500 %proc = %{$fc_plugins[$type]->{$name}};
501 }
503 if (FC_CB_EXEC == $cb_type) {
504 $cb_name = $proc{$fc_exec_names{$type}};
505 }
506 elsif (FC_CB_CREATE == $cb_type) {
507 if (defined $proc{'create'}) {
508 $cb_name = $proc{'create'};
509 }
510 else {
511 return 1;
512 }
513 }
514 elsif (FC_CB_DESTROY == $cb_type) {
515 if (defined $proc{'destroy'}) {
516 $cb_name = $proc{'destroy'};
517 }
518 else {
519 return 1;
520 }
521 }
523 $status = call_by_name (@_);
525 if ($status < 0) {
526 my $err = undef;
528 if ($@) {
529 $err = $@;
530 }
531 else {
532 $err = "callback returned false";
533 }
535 ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
536 return;
537 }
538 return $status;
539 }
541 sub fc_register {
542 my $type = shift;
543 my $name = shift;
544 my $proc = shift;
546 my %fc : shared;
548 DEBUG ("Collectd::fc_register: "
549 . "type = \"$type\" (" . $fc_types{$type}
550 . "), name = \"$name\", proc = \"$proc\"");
552 if (! ((defined $type) && (defined $name) && (defined $proc))) {
553 ERROR ("Usage: Collectd::fc_register(type, name, proc)");
554 return;
555 }
557 if (! defined $fc_plugins[$type]) {
558 ERROR ("Collectd::fc_register: Invalid type \"$type\"");
559 return;
560 }
562 if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
563 || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
564 ERROR ("Collectd::fc_register: Invalid proc.");
565 return;
566 }
568 for my $p (qw( create destroy )) {
569 if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
570 ERROR ("Collectd::fc_register: Invalid proc.");
571 return;
572 }
573 }
575 %fc = %$proc;
577 foreach my $p (keys %fc) {
578 my $pkg = scalar caller;
580 if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
581 next;
582 }
584 if ($fc{$p} !~ m/^$pkg\:\:/) {
585 $fc{$p} = $pkg . "::" . $fc{$p};
586 }
587 }
589 lock %{$fc_plugins[$type]};
590 if (defined $fc_plugins[$type]->{$name}) {
591 WARNING ("Collectd::fc_register: Overwriting previous "
592 . "definition of match \"$name\".");
593 }
595 if (! _fc_register ($type, $name)) {
596 ERROR ("Collectd::fc_register: Failed to register \"$name\".");
597 return;
598 }
600 $fc_plugins[$type]->{$name} = \%fc;
601 return 1;
602 }
604 sub _plugin_dispatch_config {
605 my $plugin = shift;
606 my $config = shift;
608 our $cb_name = undef;
610 if (! (defined ($plugin) && defined ($config))) {
611 return;
612 }
614 if (! defined $cf_callbacks{$plugin}) {
615 WARNING ("Found a configuration for the \"$plugin\" plugin, but "
616 . "the plugin isn't loaded or didn't register "
617 . "a configuration callback.");
618 return;
619 }
621 {
622 lock %cf_callbacks;
623 $cb_name = $cf_callbacks{$plugin};
624 }
625 call_by_name ($config);
626 }
628 1;
630 # vim: set sw=4 ts=4 tw=78 noexpandtab :