1 #!/usr/bin/perl
3 # collectd - contrib/rrd_filter.px
4 # Copyright (C) 2007-2008 Florian octo Forster
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License as published by the
8 # Free Software Foundation; only version 2 of the License is applicable.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 #
19 # Authors:
20 # Florian octo Forster <octo at verplant.org>
22 use strict;
23 use warnings;
25 =head1 NAME
27 rrd_filter.px - Perform same advanced non-standard operations on an RRD file.
29 =head1 SYNOPSYS
31 rrd_filter.px -i input.rrd -o output.rrd [options]
33 =head1 DEPENDENCIES
35 rrd_filter.px requires the RRDTool binary, Perl and the included
36 L<Getopt::Long> module.
38 =cut
40 use Getopt::Long ('GetOptions');
42 our $InFile;
43 our $InDS = [];
44 our $OutFile;
45 our $OutDS = [];
47 our $NewRRAs = [];
49 our $Step = 0;
51 our $Scale = 1.0;
52 our $Shift = 0.0;
54 our $Debug = 0;
56 =head1 OPTIONS
58 The following options can be passed on the command line:
60 =over 4
62 =item B<--infile> I<file>
64 =item B<-i> I<file>
66 Reads from I<file>. If I<file> ends in C<.rrd>, then C<rrdtool dump> is invoked
67 to create an XML dump of the RRD file. Otherwise the XML dump is expected
68 directly. The special filename C<-> can be used to read from STDIN.
70 =item B<--outfile> I<file>
72 =item B<-o> I<file>
74 Writes output to I<file>. If I<file> ends in C<.rrd>, then C<rrdtool restore>
75 is invoked to create a binary RRD file. Otherwise an XML output is written. The
76 special filename C<-> can be used to write to STDOUT.
78 =item B<--map> I<in_ds>:I<out_ds>
80 =item B<-m> I<in_ds>:I<out_ds>
82 Writes the datasource I<in_ds> to the output and renames it to I<out_ds>. This
83 is useful to extract one DS from an RRD file.
85 =item B<--step> I<seconds>
87 =item B<-s> I<seconds>
89 Changes the step of the output RRD file to be I<seconds>. The new stepsize must
90 be a multiple of the old stepsize of the other way around. When increasing the
91 stepsize the number of PDPs in each RRA must be dividable by the factor by
92 which the stepsize is increased. The length of CDPs and the absolute length of
93 RRAs (and thus the data itself) is not altered.
95 Examples:
97 step = 10, rra_steps = 12 => step = 60, rra_steps = 2
98 step = 300, rra_steps = 1 => step = 10, rra_steps = 30
100 =item B<--rra> B<RRA>:I<CF>:I<XFF>:I<steps>:I<rows>
102 =item B<-a> B<RRA>:I<CF>:I<XFF>:I<steps>:I<rows>
104 Inserts a new RRA in the generated RRD file. This is done B<after> the step has
105 been adjusted, take that into account when specifying I<steps> and I<rows>. For
106 an explanation of the format please see L<rrdcreate(1)>.
108 =item B<--scale> I<factor>
110 Scales the values by the factor I<factor>, i.E<nbsp>e. all values are
111 multiplied by I<factor>.
113 =item B<--shift> I<offset>
115 Shifts all values by I<offset>, i.E<nbsp>e. I<offset> is added to all values.
117 =back
119 =cut
121 GetOptions ("infile|i=s" => \$InFile,
122 "outfile|o=s" => \$OutFile,
123 'map|m=s' => sub
124 {
125 my ($in_ds, $out_ds) = split (':', $_[1]);
126 if (!defined ($in_ds) || !defined ($out_ds))
127 {
128 print STDERR "Argument for `map' incorrect! The format is `--map in_ds:out_ds'\n";
129 exit (1);
130 }
131 push (@$InDS, $in_ds);
132 push (@$OutDS, $out_ds);
133 },
134 'step|s=i' => \$Step,
135 'rra|a=s' => sub
136 {
137 my ($rra, $cf, $xff, $steps, $rows) = split (':', $_[1]);
138 if (($rra ne 'RRA') || !defined ($rows))
139 {
140 print STDERR "Please use the standard RRDTool syntax when adding RRAs. I. e. RRA:<cf><xff>:<steps>:<rows>.\n";
141 exit (1);
142 }
143 push (@$NewRRAs, {cf => $cf, xff => $xff, steps => $steps, rows => $rows});
144 },
145 'scale=f' => \$Scale,
146 'shift=f' => \$Shift
147 ) or exit (1);
149 if (!$InFile || !$OutFile)
150 {
151 print STDERR "Usage: $0 -i <infile> -m <in_ds>:<out_ds> -s <step>\n";
152 exit (1);
153 }
154 if ((1 + @$InDS) != (1 + @$OutDS))
155 {
156 print STDERR "You need the same amount of in- and out-DSes\n";
157 exit (1);
158 }
160 main ($InFile, $OutFile);
161 exit (0);
163 {
164 my $ds_index;
165 my $current_index;
166 # state 0 == searching for DS index
167 # state 1 == parse RRA header
168 # state 2 == parse values
169 my $state;
170 my $out_cache;
171 sub handle_line_dsmap
172 {
173 my $line = shift;
174 my $index = shift;
175 my $ret = '';
177 if ((@$InDS == 0) || (@$OutDS == 0))
178 {
179 post_line ($line, $index + 1);
180 return;
181 }
183 if (!defined ($state))
184 {
185 $current_index = -1;
186 $state = 0;
187 $out_cache = [];
189 # $ds_index->[new_index] = old_index
190 $ds_index = [];
191 for (my $i = 0; $i < @$InDS; $i++)
192 {
193 print STDOUT "DS map $i: $InDS->[$i] -> $OutDS->[$i]\n" if ($Debug);
194 $ds_index->[$i] = -1;
195 }
196 }
198 if ($state == 0)
199 {
200 if ($line =~ m/<ds>/)
201 {
202 $current_index++;
203 $out_cache->[$current_index] = $line;
204 }
205 elsif ($line =~ m#<name>\s*([^<\s]+)\s*</name>#)
206 {
207 # old_index == $current_index
208 # new_index == $i
209 for (my $i = 0; $i < @$InDS; $i++)
210 {
211 next if ($ds_index->[$i] >= 0);
213 if ($1 eq $InDS->[$i])
214 {
215 $line =~ s#<name>\s*([^<\s]+)\s*</name>#<name> $OutDS->[$i] </name>#;
216 $ds_index->[$i] = $current_index;
217 last;
218 }
219 }
221 $out_cache->[$current_index] .= $line;
222 }
223 elsif ($line =~ m#<last_ds>\s*([^\s>]+)\s*</last_ds>#i)
224 {
225 $out_cache->[$current_index] .= "\t\t<last_ds> NaN </last_ds>\n";
226 }
227 elsif ($line =~ m#<value>\s*([^\s>]+)\s*</value>#i)
228 {
229 $out_cache->[$current_index] .= "\t\t<value> NaN </value>\n";
230 }
231 elsif ($line =~ m#</ds>#)
232 {
233 $out_cache->[$current_index] .= $line;
234 }
235 elsif ($line =~ m#<rra>#)
236 {
237 # Print out all the DS definitions we need
238 for (my $new_index = 0; $new_index < @$InDS; $new_index++)
239 {
240 my $old_index = $ds_index->[$new_index];
241 while ($out_cache->[$old_index] =~ m/^(.*)$/gm)
242 {
243 post_line ("$1\n", $index + 1);
244 }
245 }
247 # Clear the cache - it's used in state1, too.
248 for (my $i = 0; $i <= $current_index; $i++)
249 {
250 $out_cache->[$i] = '';
251 }
253 $ret .= $line;
254 $current_index = -1;
255 $state = 1;
256 }
257 elsif ($current_index == -1)
258 {
259 # Print all the lines before the first DS definition
260 $ret .= $line;
261 }
262 else
263 {
264 # Something belonging to a DS-definition
265 $out_cache->[$current_index] .= $line;
266 }
267 }
268 elsif ($state == 1)
269 {
270 if ($line =~ m#<ds>#)
271 {
272 $current_index++;
273 $out_cache->[$current_index] .= $line;
274 }
275 elsif ($line =~ m#<value>\s*([^\s>]+)\s*</value>#i)
276 {
277 $out_cache->[$current_index] .= "\t\t\t<value> NaN </value>\n";
278 }
279 elsif ($line =~ m#</cdp_prep>#)
280 {
281 # Print out all the DS definitions we need
282 for (my $new_index = 0; $new_index < @$InDS; $new_index++)
283 {
284 my $old_index = $ds_index->[$new_index];
285 while ($out_cache->[$old_index] =~ m/^(.*)$/gm)
286 {
287 post_line ("$1\n", $index + 1);
288 }
289 }
291 # Clear the cache
292 for (my $i = 0; $i <= $current_index; $i++)
293 {
294 $out_cache->[$i] = '';
295 }
297 $ret .= $line;
298 $current_index = -1;
299 }
300 elsif ($line =~ m#<database>#)
301 {
302 $ret .= $line;
303 $state = 2;
304 }
305 elsif ($current_index == -1)
306 {
307 # Print all the lines before the first DS definition
308 # and after cdp_prep
309 $ret .= $line;
310 }
311 else
312 {
313 # Something belonging to a DS-definition
314 $out_cache->[$current_index] .= $line;
315 }
316 }
317 elsif ($state == 2)
318 {
319 if ($line =~ m#</database>#)
320 {
321 $ret .= $line;
322 $current_index = -1;
323 $state = 1;
324 }
325 else
326 {
327 my @values = ();
328 my $i;
330 $ret .= "\t\t";
332 if ($line =~ m#(<!-- .*? -->)#)
333 {
334 $ret .= "$1 ";
335 }
336 $ret .= "<row> ";
338 $i = 0;
339 while ($line =~ m#<v>\s*([^<\s]+)\s*</v>#g)
340 {
341 $values[$i] = $1;
342 $i++;
343 }
345 for (my $new_index = 0; $new_index < @$InDS; $new_index++)
346 {
347 my $old_index = $ds_index->[$new_index];
348 $ret .= '<v> ' . $values[$old_index] . ' </v> ';
349 }
350 $ret .= "</row>\n";
351 }
352 }
353 else
354 {
355 die;
356 }
358 if ($ret)
359 {
360 post_line ($ret, $index + 1);
361 }
362 }} # handle_line_dsmap
364 #
365 # The _step_ handler
366 #
367 {
368 my $step_factor_up;
369 my $step_factor_down;
370 sub handle_line_step
371 {
372 my $line = shift;
373 my $index = shift;
375 if (!$Step)
376 {
377 post_line ($line, $index + 1);
378 return;
379 }
381 if ($Debug && !defined ($step_factor_up))
382 {
383 print STDOUT "New step: $Step\n";
384 }
386 $step_factor_up ||= 0;
387 $step_factor_down ||= 0;
389 if (($step_factor_up == 0) && ($step_factor_down == 0))
390 {
391 if ($line =~ m#<step>\s*(\d+)\s*</step>#i)
392 {
393 my $old_step = 0 + $1;
394 if ($Step < $old_step)
395 {
396 $step_factor_down = int ($old_step / $Step);
397 if (($step_factor_down * $Step) != $old_step)
398 {
399 print STDERR "The old step ($old_step seconds) "
400 . "is not a multiple of the new step "
401 . "($Step seconds).\n";
402 exit (1);
403 }
404 $line = "<step> $Step </step>\n";
405 }
406 elsif ($Step > $old_step)
407 {
408 $step_factor_up = int ($Step / $old_step);
409 if (($step_factor_up * $old_step) != $Step)
410 {
411 print STDERR "The new step ($Step seconds) "
412 . "is not a multiple of the old step "
413 . "($old_step seconds).\n";
414 exit (1);
415 }
416 $line = "<step> $Step </step>\n";
417 }
418 else
419 {
420 $Step = 0;
421 }
422 }
423 }
424 elsif ($line =~ m#<pdp_per_row>\s*(\d+)\s*</pdp_per_row>#i)
425 {
426 my $old_val = 0 + $1;
427 my $new_val;
428 if ($step_factor_up)
429 {
430 $new_val = int ($old_val / $step_factor_up);
431 if (($new_val * $step_factor_up) != $old_val)
432 {
433 print STDERR "Can't divide number of PDPs per row ($old_val) by step-factor ($step_factor_up).\n";
434 exit (1);
435 }
436 }
437 else
438 {
439 $new_val = $step_factor_down * $old_val;
440 }
441 $line = "<pdp_per_row> $new_val </pdp_per_row>\n";
442 }
444 post_line ($line, $index + 1);
445 }} # handle_line_step
447 #
448 # The _add RRA_ handler
449 #
450 {
451 my $add_rra_done;
452 my $num_ds;
453 sub handle_line_add_rra
454 {
455 my $line = shift;
456 my $index = shift;
458 my $post = sub { for (@_) { post_line ($_, $index + 1); } };
460 $num_ds ||= 0;
462 if (!@$NewRRAs || $add_rra_done)
463 {
464 $post->($line);
465 return;
466 }
468 if ($line =~ m#<ds>#i)
469 {
470 $num_ds++;
471 }
472 elsif ($line =~ m#<rra>#i)
473 {
474 for (my $i = 0; $i < @$NewRRAs; $i++)
475 {
476 my $rra = $NewRRAs->[$i];
477 my $temp;
479 if ($Debug)
480 {
481 print STDOUT "Adding RRA: CF = $rra->{'cf'}, xff = $rra->{'xff'}, steps = $rra->{'steps'}, rows = $rra->{'rows'}, num_ds = $num_ds\n";
482 }
484 $post->("\t<rra>\n",
485 "\t\t<cf> $rra->{'cf'} </cf>\n",
486 "\t\t<pdp_per_row> $rra->{'steps'} </pdp_per_row>\n",
487 "\t\t<params>\n",
488 "\t\t\t<xff> $rra->{'xff'} </xff>\n",
489 "\t\t</params>\n",
490 "\t\t<cdp_prep>\n");
492 for (my $j = 0; $j < $num_ds; $j++)
493 {
494 $post->("\t\t\t<ds>\n",
495 "\t\t\t\t<primary_value> NaN </primary_value>\n",
496 "\t\t\t\t<secondary_value> NaN </secondary_value>\n",
497 "\t\t\t\t<value> NaN </value>\n",
498 "\t\t\t\t<unknown_datapoints> 0 </unknown_datapoints>\n",
499 "\t\t\t</ds>\n");
500 }
502 $post->("\t\t</cdp_prep>\n", "\t\t<database>\n");
503 $temp = "\t\t\t<row>" . join ('', map { "<v> NaN </v>" } (1 .. $num_ds)) . "</row>\n";
504 for (my $j = 0; $j < $rra->{'rows'}; $j++)
505 {
506 $post->($temp);
507 }
508 $post->("\t\t</database>\n", "\t</rra>\n");
509 }
511 $add_rra_done = 1;
512 }
514 $post->($line);
515 }} # handle_line_add_rra
517 #
518 # The _scale/shift_ handler
519 #
520 sub calculate_scale_shift
521 {
522 my $value = shift;
523 my $tag = shift;
524 my $scale = shift;
525 my $shift = shift;
527 if (lc ("$value") eq 'nan')
528 {
529 $value = 'NaN';
530 return ("<$tag> NaN </$tag>");
531 }
533 $value = ($scale * (0.0 + $value)) + $shift;
534 return (sprintf ("<%s> %1.10e </%s>", $tag, $value, $tag));
535 }
537 sub handle_line_scale_shift
538 {
539 my $line = shift;
540 my $index = shift;
542 if (($Scale != 1.0) || ($Shift != 0.0))
543 {
544 $line =~ s#<(min|max|last_ds|value|primary_value|secondary_value|v)>\s*([^\s<]+)\s*</[^>]+>#calculate_scale_shift ($2, $1, $Scale, $Shift)#eg;
545 }
547 post_line ($line, $index + 1);
548 }
550 #
551 # The _output_ handler
552 #
553 # This filter is unfinished!
554 #
555 {
556 my $fh;
557 sub set_output
558 {
559 $fh = shift;
560 }
562 {
563 my $previous_values;
564 my $previous_differences;
565 my $pdp_per_row;
566 sub handle_line_peak_detect
567 {
568 my $line = shift;
569 my $index = shift;
571 if (!$previous_values)
572 {
573 $previous_values = [];
574 $previous_differences = [];
575 }
577 if ($line =~ m#</database>#i)
578 {
579 $previous_values = [];
580 $previous_differences = [];
581 print STDERR "==============================================================================\n";
582 }
583 elsif ($line =~ m#<pdp_per_row>\s*([1-9][0-9]*)\s*</pdp_per_row>#)
584 {
585 $pdp_per_row = int ($1);
586 print STDERR "pdp_per_row = $pdp_per_row;\n";
587 }
588 elsif ($line =~ m#<row>#)
589 {
590 my @values = ();
591 while ($line =~ m#<v>\s*([^\s>]+)\s*</v>#ig)
592 {
593 if ($1 eq 'NaN')
594 {
595 push (@values, undef);
596 }
597 else
598 {
599 push (@values, 0.0 + $1);
600 }
601 }
603 for (my $i = 0; $i < @values; $i++)
604 {
605 if (!defined ($values[$i]))
606 {
607 $previous_values->[$i] = undef;
608 }
609 elsif (!defined ($previous_values->[$i]))
610 {
611 $previous_values->[$i] = $values[$i];
612 }
613 elsif (!defined ($previous_differences->[$i]))
614 {
615 $previous_differences->[$i] = abs ($previous_values->[$i] - $values[$i]);
616 }
617 else
618 {
619 my $divisor = ($previous_differences->[$i] < 1.0) ? 1.0 : $previous_differences->[$i];
620 my $difference = abs ($previous_values->[$i] - $values[$i]);
621 my $change = $pdp_per_row * $difference / $divisor;
622 if (($divisor > 10.0) && ($change > 10e5))
623 {
624 print STDERR "i = $i; average difference = " . $previous_differences->[$i]. "; current difference = " . $difference. "; change = $change;\n";
625 }
626 $previous_values->[$i] = $values[$i];
627 $previous_differences->[$i] = (0.95 * $previous_differences->[$i]) + (0.05 * $difference);
628 }
629 }
630 }
632 post_line ($line, $index + 1);
633 }} # handle_line_peak_detect
635 sub handle_line_output
636 {
637 my $line = shift;
638 my $index = shift;
640 if (!defined ($fh))
641 {
642 post_line ($line, $index + 1);
643 return;
644 }
646 print $fh $line;
647 }} # handle_line_output
649 #
650 # Dispatching logic
651 #
652 {
653 my @handlers = ();
654 sub add_handler
655 {
656 my $handler = shift;
658 die unless (ref ($handler) eq 'CODE');
659 push (@handlers, $handler);
660 } # add_handler
662 sub post_line
663 {
664 my $line = shift;
665 my $index = shift;
667 if (0)
668 {
669 my $copy = $line;
670 chomp ($copy);
671 print "DEBUG: post_line ($copy, $index);\n";
672 }
674 if ($index > $#handlers)
675 {
676 return;
677 }
678 $handlers[$index]->($line, $index);
679 }} # post_line
681 sub handle_fh
682 {
683 my $in_fh = shift;
684 my $out_fh = shift;
686 set_output ($out_fh);
688 if (@$InDS)
689 {
690 add_handler (\&handle_line_dsmap);
691 }
693 if ($Step)
694 {
695 add_handler (\&handle_line_step);
696 }
698 if (($Scale != 1.0) || ($Shift != 0.0))
699 {
700 add_handler (\&handle_line_scale_shift);
701 }
703 #add_handler (\&handle_line_peak_detect);
705 if (@$NewRRAs)
706 {
707 add_handler (\&handle_line_add_rra);
708 }
710 add_handler (\&handle_line_output);
712 while (my $line = <$in_fh>)
713 {
714 post_line ($line, 0);
715 }
716 } # handle_fh
718 sub main
719 {
720 my $in_file = shift;
721 my $out_file = shift;
723 my $in_fh;
724 my $out_fh;
726 my $in_needs_close = 1;
727 my $out_needs_close = 1;
729 if ($in_file =~ m/\.rrd$/i)
730 {
731 open ($in_fh, '-|', 'rrdtool', 'dump', $in_file) or die ("open (rrdtool): $!");
732 }
733 elsif ($in_file eq '-')
734 {
735 $in_fh = \*STDIN;
736 $in_needs_close = 0;
737 }
738 else
739 {
740 open ($in_fh, '<', $in_file) or die ("open ($in_file): $!");
741 }
743 if ($out_file =~ m/\.rrd$/i)
744 {
745 open ($out_fh, '|-', 'rrdtool', 'restore', '-', $out_file) or die ("open (rrdtool): $!");
746 }
747 elsif ($out_file eq '-')
748 {
749 $out_fh = \*STDOUT;
750 $out_needs_close = 0;
751 }
752 else
753 {
754 open ($out_fh, '>', $out_file) or die ("open ($out_file): $!");
755 }
757 handle_fh ($in_fh, $out_fh);
759 if ($in_needs_close)
760 {
761 close ($in_fh);
762 }
763 if ($out_needs_close)
764 {
765 close ($out_fh);
766 }
767 } # main
769 =head1 LICENSE
771 This script is licensed under the GNU general public license, versionE<nbsp>2
772 (GPLv2).
774 =head1 AUTHOR
776 Florian octo Forster E<lt>octo at verplant.orgE<gt>