2 /*****************************************************************************
3 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
8 * $Id$
9 *****************************************************************************/
11 #include "rrd_tool.h"
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
19 #include <locale.h>
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
32 * replacement.
33 */
34 #include <sys/timeb.h>
36 #ifndef __MINGW32__
37 struct timeval {
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
40 };
41 #endif
43 struct __timezone {
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
46 };
48 static int gettimeofday(
49 struct timeval *t,
50 struct __timezone *tz)
51 {
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
60 return 0;
61 }
63 #endif
65 /* FUNCTION PROTOTYPES */
67 int rrd_update_r(
68 const char *filename,
69 const char *tmplt,
70 int argc,
71 const char **argv);
72 int _rrd_update(
73 const char *filename,
74 const char *tmplt,
75 int argc,
76 const char **argv,
77 rrd_info_t *);
79 static int allocate_data_structures(
80 rrd_t *rrd,
81 char ***updvals,
82 rrd_value_t **pdp_temp,
83 const char *tmplt,
84 long **tmpl_idx,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
91 rrd_t *rrd,
92 const char *tmplt,
93 unsigned long *tmpl_cnt,
94 long *tmpl_idx);
96 static int process_arg(
97 char *step_start,
98 rrd_t *rrd,
99 rrd_file_t *rrd_file,
100 unsigned long rra_begin,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
106 char **updvals,
107 long *tmpl_idx,
108 unsigned long tmpl_cnt,
109 rrd_info_t ** pcdp_summary,
110 int version,
111 unsigned long *skip_update,
112 int *schedule_smooth);
114 static int parse_ds(
115 rrd_t *rrd,
116 char **updvals,
117 long *tmpl_idx,
118 char *input,
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
122 int version);
124 static int get_time_from_reading(
125 rrd_t *rrd,
126 char timesyntax,
127 char **updvals,
128 time_t *current_time,
129 unsigned long *current_time_usec,
130 int version);
132 static int update_pdp_prep(
133 rrd_t *rrd,
134 char **updvals,
135 rrd_value_t *pdp_new,
136 double interval);
138 static int calculate_elapsed_steps(
139 rrd_t *rrd,
140 unsigned long current_time,
141 unsigned long current_time_usec,
142 double interval,
143 double *pre_int,
144 double *post_int,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
148 rrd_t *rrd,
149 double interval,
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
153 rrd_t *rrd,
154 double interval,
155 double pre_int,
156 double post_int,
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
162 rrd_t *rrd,
163 unsigned long ds_idx,
164 double interval,
165 double pre_int,
166 double post_int,
167 long diff_pdp_st,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
172 rrd_t *rrd,
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *skip_update,
182 int *schedule_smooth);
184 static int do_schedule_smooth(
185 rrd_t *rrd,
186 unsigned long rra_idx,
187 unsigned long elapsed_pdp_st);
189 static int update_cdp_prep(
190 rrd_t *rrd,
191 unsigned long elapsed_pdp_st,
192 unsigned long start_pdp_offset,
193 unsigned long *rra_step_cnt,
194 int rra_idx,
195 rrd_value_t *pdp_temp,
196 rrd_value_t *last_seasonal_coef,
197 rrd_value_t *seasonal_coef,
198 int current_cf);
200 static void update_cdp(
201 unival *scratch,
202 int current_cf,
203 rrd_value_t pdp_temp_val,
204 unsigned long rra_step_cnt,
205 unsigned long elapsed_pdp_st,
206 unsigned long start_pdp_offset,
207 unsigned long pdp_cnt,
208 rrd_value_t xff,
209 int i,
210 int ii);
212 static void initialize_cdp_val(
213 unival *scratch,
214 int current_cf,
215 rrd_value_t pdp_temp_val,
216 unsigned long elapsed_pdp_st,
217 unsigned long start_pdp_offset,
218 unsigned long pdp_cnt);
220 static void reset_cdp(
221 rrd_t *rrd,
222 unsigned long elapsed_pdp_st,
223 rrd_value_t *pdp_temp,
224 rrd_value_t *last_seasonal_coef,
225 rrd_value_t *seasonal_coef,
226 int rra_idx,
227 int ds_idx,
228 int cdp_idx,
229 enum cf_en current_cf);
231 static rrd_value_t initialize_average_carry_over(
232 rrd_value_t pdp_temp_val,
233 unsigned long elapsed_pdp_st,
234 unsigned long start_pdp_offset,
235 unsigned long pdp_cnt);
237 static rrd_value_t calculate_cdp_val(
238 rrd_value_t cdp_val,
239 rrd_value_t pdp_temp_val,
240 unsigned long elapsed_pdp_st,
241 int current_cf,
242 int i,
243 int ii);
245 static int update_aberrant_cdps(
246 rrd_t *rrd,
247 rrd_file_t *rrd_file,
248 unsigned long rra_begin,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
254 rrd_t *rrd,
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
258 time_t current_time,
259 unsigned long *skip_update,
260 rrd_info_t ** pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
264 rrd_t *rrd,
265 unsigned long rra_idx,
266 unsigned short CDP_scratch_idx,
267 rrd_info_t ** pcdp_summary,
268 time_t rra_time);
270 static int smooth_all_rras(
271 rrd_t *rrd,
272 rrd_file_t *rrd_file,
273 unsigned long rra_begin);
275 #ifndef HAVE_MMAP
276 static int write_changes_to_disk(
277 rrd_t *rrd,
278 rrd_file_t *rrd_file,
279 int version);
280 #endif
282 /*
283 * normalize time as returned by gettimeofday. usec part must
284 * be always >= 0
285 */
286 static inline void normalize_time(
287 struct timeval *t)
288 {
289 if (t->tv_usec < 0) {
290 t->tv_sec--;
291 t->tv_usec += 1e6L;
292 }
293 }
295 /*
296 * Sets current_time and current_time_usec based on the current time.
297 * current_time_usec is set to 0 if the version number is 1 or 2.
298 */
299 static inline void initialize_time(
300 time_t *current_time,
301 unsigned long *current_time_usec,
302 int version)
303 {
304 struct timeval tmp_time; /* used for time conversion */
306 gettimeofday(&tmp_time, 0);
307 normalize_time(&tmp_time);
308 *current_time = tmp_time.tv_sec;
309 if (version >= 3) {
310 *current_time_usec = tmp_time.tv_usec;
311 } else {
312 *current_time_usec = 0;
313 }
314 }
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318 rrd_info_t *rrd_update_v(
319 int argc,
320 char **argv)
321 {
322 char *tmplt = NULL;
323 rrd_info_t *result = NULL;
324 rrd_infoval_t rc;
325 char *opt_daemon = NULL;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
328 {0, 0, 0, 0}
329 };
331 rc.u_int = -1;
332 optind = 0;
333 opterr = 0; /* initialize getopt */
335 while (1) {
336 int option_index = 0;
337 int opt;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341 if (opt == EOF)
342 break;
344 switch (opt) {
345 case 't':
346 tmplt = optarg;
347 break;
349 case '?':
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
351 goto end_tag;
352 }
353 }
355 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356 if (opt_daemon != NULL) {
357 rrd_set_error ("The \"%s\" environment variable is defined, "
358 "but \"%s\" cannot work with rrdcached. Either unset "
359 "the environment variable or use \"update\" instead.",
360 ENV_RRDCACHED_ADDRESS, argv[0]);
361 goto end_tag;
362 }
364 /* need at least 2 arguments: filename, data. */
365 if (argc - optind < 2) {
366 rrd_set_error("Not enough arguments");
367 goto end_tag;
368 }
369 rc.u_int = 0;
370 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371 rc.u_int = _rrd_update(argv[optind], tmplt,
372 argc - optind - 1,
373 (const char **) (argv + optind + 1), result);
374 result->value.u_int = rc.u_int;
375 end_tag:
376 return result;
377 }
379 int rrd_update(
380 int argc,
381 char **argv)
382 {
383 struct option long_options[] = {
384 {"template", required_argument, 0, 't'},
385 {"daemon", required_argument, 0, 'd'},
386 {0, 0, 0, 0}
387 };
388 int option_index = 0;
389 int opt;
390 char *tmplt = NULL;
391 int rc = -1;
392 char *opt_daemon = NULL;
394 optind = 0;
395 opterr = 0; /* initialize getopt */
397 while (1) {
398 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
400 if (opt == EOF)
401 break;
403 switch (opt) {
404 case 't':
405 tmplt = strdup(optarg);
406 break;
408 case 'd':
409 if (opt_daemon != NULL)
410 free (opt_daemon);
411 opt_daemon = strdup (optarg);
412 if (opt_daemon == NULL)
413 {
414 rrd_set_error("strdup failed.");
415 goto out;
416 }
417 break;
419 case '?':
420 rrd_set_error("unknown option '%s'", argv[optind - 1]);
421 goto out;
422 }
423 }
425 /* need at least 2 arguments: filename, data. */
426 if (argc - optind < 2) {
427 rrd_set_error("Not enough arguments");
428 goto out;
429 }
431 { /* try to connect to rrdcached */
432 int status = rrdc_connect(opt_daemon);
433 if (status != 0) return status;
434 }
436 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437 {
438 rrd_set_error("The caching daemon cannot be used together with "
439 "templates yet.");
440 goto out;
441 }
443 if (! rrdc_is_connected(opt_daemon))
444 {
445 rc = rrd_update_r(argv[optind], tmplt,
446 argc - optind - 1, (const char **) (argv + optind + 1));
447 }
448 else /* we are connected */
449 {
450 rc = rrdc_update (argv[optind], /* file */
451 argc - optind - 1, /* values_num */
452 (void *) (argv + optind + 1)); /* values */
453 if (rc > 0)
454 rrd_set_error("Failed sending the values to rrdcached: %s",
455 rrd_strerror (rc));
456 }
458 out:
459 if (tmplt != NULL)
460 {
461 free(tmplt);
462 tmplt = NULL;
463 }
464 if (opt_daemon != NULL)
465 {
466 free (opt_daemon);
467 opt_daemon = NULL;
468 }
469 return rc;
470 }
472 int rrd_update_r(
473 const char *filename,
474 const char *tmplt,
475 int argc,
476 const char **argv)
477 {
478 return _rrd_update(filename, tmplt, argc, argv, NULL);
479 }
481 int _rrd_update(
482 const char *filename,
483 const char *tmplt,
484 int argc,
485 const char **argv,
486 rrd_info_t * pcdp_summary)
487 {
489 int arg_i = 2;
491 unsigned long rra_begin; /* byte pointer to the rra
492 * area in the rrd file. this
493 * pointer never changes value */
494 rrd_value_t *pdp_new; /* prepare the incoming data to be added
495 * to the existing entry */
496 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
497 * to the cdp values */
499 long *tmpl_idx; /* index representing the settings
500 * transported by the tmplt index */
501 unsigned long tmpl_cnt = 2; /* time and data */
502 rrd_t rrd;
503 time_t current_time = 0;
504 unsigned long current_time_usec = 0; /* microseconds part of current time */
505 char **updvals;
506 int schedule_smooth = 0;
508 /* number of elapsed PDP steps since last update */
509 unsigned long *rra_step_cnt = NULL;
511 int version; /* rrd version */
512 rrd_file_t *rrd_file;
513 char *arg_copy; /* for processing the argv */
514 unsigned long *skip_update; /* RRAs to advance but not write */
516 /* need at least 1 arguments: data. */
517 if (argc < 1) {
518 rrd_set_error("Not enough arguments");
519 goto err_out;
520 }
522 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
523 goto err_free;
524 }
525 /* We are now at the beginning of the rra's */
526 rra_begin = rrd_file->header_len;
528 version = atoi(rrd.stat_head->version);
530 initialize_time(¤t_time, ¤t_time_usec, version);
532 /* get exclusive lock to whole file.
533 * lock gets removed when we close the file.
534 */
535 if (rrd_lock(rrd_file) != 0) {
536 rrd_set_error("could not lock RRD");
537 goto err_close;
538 }
540 if (allocate_data_structures(&rrd, &updvals,
541 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542 &rra_step_cnt, &skip_update,
543 &pdp_new) == -1) {
544 goto err_close;
545 }
547 /* loop through the arguments. */
548 for (arg_i = 0; arg_i < argc; arg_i++) {
549 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550 rrd_set_error("failed duplication argv entry");
551 break;
552 }
553 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
555 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556 &pcdp_summary, version, skip_update,
557 &schedule_smooth) == -1) {
558 if (rrd_test_error()) { /* Should have error string always here */
559 char *save_error;
561 /* Prepend file name to error message */
562 if ((save_error = strdup(rrd_get_error())) != NULL) {
563 rrd_set_error("%s: %s", filename, save_error);
564 free(save_error);
565 }
566 }
567 free(arg_copy);
568 break;
569 }
570 free(arg_copy);
571 }
573 free(rra_step_cnt);
575 /* if we got here and if there is an error and if the file has not been
576 * written to, then close things up and return. */
577 if (rrd_test_error()) {
578 goto err_free_structures;
579 }
580 #ifndef HAVE_MMAP
581 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582 goto err_free_structures;
583 }
584 #endif
586 /* calling the smoothing code here guarantees at most one smoothing
587 * operation per rrd_update call. Unfortunately, it is possible with bulk
588 * updates, or a long-delayed update for smoothing to occur off-schedule.
589 * This really isn't critical except during the burn-in cycles. */
590 if (schedule_smooth) {
591 smooth_all_rras(&rrd, rrd_file, rra_begin);
592 }
594 /* rrd_dontneed(rrd_file,&rrd); */
595 rrd_free(&rrd);
596 rrd_close(rrd_file);
598 free(pdp_new);
599 free(tmpl_idx);
600 free(pdp_temp);
601 free(skip_update);
602 free(updvals);
603 return 0;
605 err_free_structures:
606 free(pdp_new);
607 free(tmpl_idx);
608 free(pdp_temp);
609 free(skip_update);
610 free(updvals);
611 err_close:
612 rrd_close(rrd_file);
613 err_free:
614 rrd_free(&rrd);
615 err_out:
616 return -1;
617 }
619 /*
620 * get exclusive lock to whole file.
621 * lock gets removed when we close the file
622 *
623 * returns 0 on success
624 */
625 int rrd_lock(
626 rrd_file_t *file)
627 {
628 int rcstat;
630 {
631 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
632 struct _stat st;
634 if (_fstat(file->fd, &st) == 0) {
635 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
636 } else {
637 rcstat = -1;
638 }
639 #else
640 struct flock lock;
642 lock.l_type = F_WRLCK; /* exclusive write lock */
643 lock.l_len = 0; /* whole file */
644 lock.l_start = 0; /* start of file */
645 lock.l_whence = SEEK_SET; /* end of file */
647 rcstat = fcntl(file->fd, F_SETLK, &lock);
648 #endif
649 }
651 return (rcstat);
652 }
654 /*
655 * Allocate some important arrays used, and initialize the template.
656 *
657 * When it returns, either all of the structures are allocated
658 * or none of them are.
659 *
660 * Returns 0 on success, -1 on error.
661 */
662 static int allocate_data_structures(
663 rrd_t *rrd,
664 char ***updvals,
665 rrd_value_t **pdp_temp,
666 const char *tmplt,
667 long **tmpl_idx,
668 unsigned long *tmpl_cnt,
669 unsigned long **rra_step_cnt,
670 unsigned long **skip_update,
671 rrd_value_t **pdp_new)
672 {
673 unsigned i, ii;
674 if ((*updvals = (char **) malloc(sizeof(char *)
675 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
676 rrd_set_error("allocating updvals pointer array.");
677 return -1;
678 }
679 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
680 * rrd->stat_head->ds_cnt)) ==
681 NULL) {
682 rrd_set_error("allocating pdp_temp.");
683 goto err_free_updvals;
684 }
685 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
686 *
687 rrd->stat_head->rra_cnt)) ==
688 NULL) {
689 rrd_set_error("allocating skip_update.");
690 goto err_free_pdp_temp;
691 }
692 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
693 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
694 rrd_set_error("allocating tmpl_idx.");
695 goto err_free_skip_update;
696 }
697 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
698 *
699 (rrd->stat_head->
700 rra_cnt))) == NULL) {
701 rrd_set_error("allocating rra_step_cnt.");
702 goto err_free_tmpl_idx;
703 }
705 /* initialize tmplt redirector */
706 /* default config example (assume DS 1 is a CDEF DS)
707 tmpl_idx[0] -> 0; (time)
708 tmpl_idx[1] -> 1; (DS 0)
709 tmpl_idx[2] -> 3; (DS 2)
710 tmpl_idx[3] -> 4; (DS 3) */
711 (*tmpl_idx)[0] = 0; /* time */
712 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
713 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
714 (*tmpl_idx)[ii++] = i;
715 }
716 *tmpl_cnt = ii;
718 if (tmplt != NULL) {
719 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
720 goto err_free_rra_step_cnt;
721 }
722 }
724 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
725 * rrd->stat_head->ds_cnt)) == NULL) {
726 rrd_set_error("allocating pdp_new.");
727 goto err_free_rra_step_cnt;
728 }
730 return 0;
732 err_free_rra_step_cnt:
733 free(*rra_step_cnt);
734 err_free_tmpl_idx:
735 free(*tmpl_idx);
736 err_free_skip_update:
737 free(*skip_update);
738 err_free_pdp_temp:
739 free(*pdp_temp);
740 err_free_updvals:
741 free(*updvals);
742 return -1;
743 }
745 /*
746 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
747 *
748 * Returns 0 on success.
749 */
750 static int parse_template(
751 rrd_t *rrd,
752 const char *tmplt,
753 unsigned long *tmpl_cnt,
754 long *tmpl_idx)
755 {
756 char *dsname, *tmplt_copy;
757 unsigned int tmpl_len, i;
758 int ret = 0;
760 *tmpl_cnt = 1; /* the first entry is the time */
762 /* we should work on a writeable copy here */
763 if ((tmplt_copy = strdup(tmplt)) == NULL) {
764 rrd_set_error("error copying tmplt '%s'", tmplt);
765 ret = -1;
766 goto out;
767 }
769 dsname = tmplt_copy;
770 tmpl_len = strlen(tmplt_copy);
771 for (i = 0; i <= tmpl_len; i++) {
772 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
773 tmplt_copy[i] = '\0';
774 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
775 rrd_set_error("tmplt contains more DS definitions than RRD");
776 ret = -1;
777 goto out_free_tmpl_copy;
778 }
779 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
780 rrd_set_error("unknown DS name '%s'", dsname);
781 ret = -1;
782 goto out_free_tmpl_copy;
783 }
784 /* go to the next entry on the tmplt_copy */
785 if (i < tmpl_len)
786 dsname = &tmplt_copy[i + 1];
787 }
788 }
789 out_free_tmpl_copy:
790 free(tmplt_copy);
791 out:
792 return ret;
793 }
795 /*
796 * Parse an update string, updates the primary data points (PDPs)
797 * and consolidated data points (CDPs), and writes changes to the RRAs.
798 *
799 * Returns 0 on success, -1 on error.
800 */
801 static int process_arg(
802 char *step_start,
803 rrd_t *rrd,
804 rrd_file_t *rrd_file,
805 unsigned long rra_begin,
806 time_t *current_time,
807 unsigned long *current_time_usec,
808 rrd_value_t *pdp_temp,
809 rrd_value_t *pdp_new,
810 unsigned long *rra_step_cnt,
811 char **updvals,
812 long *tmpl_idx,
813 unsigned long tmpl_cnt,
814 rrd_info_t ** pcdp_summary,
815 int version,
816 unsigned long *skip_update,
817 int *schedule_smooth)
818 {
819 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
821 /* a vector of future Holt-Winters seasonal coefs */
822 unsigned long elapsed_pdp_st;
824 double interval, pre_int, post_int; /* interval between this and
825 * the last run */
826 unsigned long proc_pdp_cnt;
828 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
829 current_time, current_time_usec, version) == -1) {
830 return -1;
831 }
833 interval = (double) (*current_time - rrd->live_head->last_up)
834 + (double) ((long) *current_time_usec -
835 (long) rrd->live_head->last_up_usec) / 1e6f;
837 /* process the data sources and update the pdp_prep
838 * area accordingly */
839 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
840 return -1;
841 }
843 elapsed_pdp_st = calculate_elapsed_steps(rrd,
844 *current_time,
845 *current_time_usec, interval,
846 &pre_int, &post_int,
847 &proc_pdp_cnt);
849 /* has a pdp_st moment occurred since the last run ? */
850 if (elapsed_pdp_st == 0) {
851 /* no we have not passed a pdp_st moment. therefore update is simple */
852 simple_update(rrd, interval, pdp_new);
853 } else {
854 /* an pdp_st has occurred. */
855 if (process_all_pdp_st(rrd, interval,
856 pre_int, post_int,
857 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
858 return -1;
859 }
860 if (update_all_cdp_prep(rrd, rra_step_cnt,
861 rra_begin, rrd_file,
862 elapsed_pdp_st,
863 proc_pdp_cnt,
864 &last_seasonal_coef,
865 &seasonal_coef,
866 pdp_temp,
867 skip_update, schedule_smooth) == -1) {
868 goto err_free_coefficients;
869 }
870 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
871 elapsed_pdp_st, pdp_temp,
872 &seasonal_coef) == -1) {
873 goto err_free_coefficients;
874 }
875 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
876 *current_time, skip_update,
877 pcdp_summary) == -1) {
878 goto err_free_coefficients;
879 }
880 } /* endif a pdp_st has occurred */
881 rrd->live_head->last_up = *current_time;
882 rrd->live_head->last_up_usec = *current_time_usec;
884 if (version < 3) {
885 *rrd->legacy_last_up = rrd->live_head->last_up;
886 }
887 free(seasonal_coef);
888 free(last_seasonal_coef);
889 return 0;
891 err_free_coefficients:
892 free(seasonal_coef);
893 free(last_seasonal_coef);
894 return -1;
895 }
897 /*
898 * Parse a DS string (time + colon-separated values), storing the
899 * results in current_time, current_time_usec, and updvals.
900 *
901 * Returns 0 on success, -1 on error.
902 */
903 static int parse_ds(
904 rrd_t *rrd,
905 char **updvals,
906 long *tmpl_idx,
907 char *input,
908 unsigned long tmpl_cnt,
909 time_t *current_time,
910 unsigned long *current_time_usec,
911 int version)
912 {
913 char *p;
914 unsigned long i;
915 char timesyntax;
917 updvals[0] = input;
918 /* initialize all ds input to unknown except the first one
919 which has always got to be set */
920 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
921 updvals[i] = "U";
923 /* separate all ds elements; first must be examined separately
924 due to alternate time syntax */
925 if ((p = strchr(input, '@')) != NULL) {
926 timesyntax = '@';
927 } else if ((p = strchr(input, ':')) != NULL) {
928 timesyntax = ':';
929 } else {
930 rrd_set_error("expected timestamp not found in data source from %s",
931 input);
932 return -1;
933 }
934 *p = '\0';
935 i = 1;
936 updvals[tmpl_idx[i++]] = p + 1;
937 while (*(++p)) {
938 if (*p == ':') {
939 *p = '\0';
940 if (i < tmpl_cnt) {
941 updvals[tmpl_idx[i++]] = p + 1;
942 }
943 }
944 }
946 if (i != tmpl_cnt) {
947 rrd_set_error("expected %lu data source readings (got %lu) from %s",
948 tmpl_cnt - 1, i, input);
949 return -1;
950 }
952 if (get_time_from_reading(rrd, timesyntax, updvals,
953 current_time, current_time_usec,
954 version) == -1) {
955 return -1;
956 }
957 return 0;
958 }
960 /*
961 * Parse the time in a DS string, store it in current_time and
962 * current_time_usec and verify that it's later than the last
963 * update for this DS.
964 *
965 * Returns 0 on success, -1 on error.
966 */
967 static int get_time_from_reading(
968 rrd_t *rrd,
969 char timesyntax,
970 char **updvals,
971 time_t *current_time,
972 unsigned long *current_time_usec,
973 int version)
974 {
975 double tmp;
976 char *parsetime_error = NULL;
977 char *old_locale;
978 rrd_time_value_t ds_tv;
979 struct timeval tmp_time; /* used for time conversion */
981 /* get the time from the reading ... handle N */
982 if (timesyntax == '@') { /* at-style */
983 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
984 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
985 return -1;
986 }
987 if (ds_tv.type == RELATIVE_TO_END_TIME ||
988 ds_tv.type == RELATIVE_TO_START_TIME) {
989 rrd_set_error("specifying time relative to the 'start' "
990 "or 'end' makes no sense here: %s", updvals[0]);
991 return -1;
992 }
993 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
994 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
995 } else if (strcmp(updvals[0], "N") == 0) {
996 gettimeofday(&tmp_time, 0);
997 normalize_time(&tmp_time);
998 *current_time = tmp_time.tv_sec;
999 *current_time_usec = tmp_time.tv_usec;
1000 } else {
1001 old_locale = setlocale(LC_NUMERIC, "C");
1002 tmp = strtod(updvals[0], 0);
1003 setlocale(LC_NUMERIC, old_locale);
1004 *current_time = floor(tmp);
1005 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1006 }
1007 /* dont do any correction for old version RRDs */
1008 if (version < 3)
1009 *current_time_usec = 0;
1011 if (*current_time < rrd->live_head->last_up ||
1012 (*current_time == rrd->live_head->last_up &&
1013 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1014 rrd_set_error("illegal attempt to update using time %ld when "
1015 "last update time is %ld (minimum one second step)",
1016 *current_time, rrd->live_head->last_up);
1017 return -1;
1018 }
1019 return 0;
1020 }
1022 /*
1023 * Update pdp_new by interpreting the updvals according to the DS type
1024 * (COUNTER, GAUGE, etc.).
1025 *
1026 * Returns 0 on success, -1 on error.
1027 */
1028 static int update_pdp_prep(
1029 rrd_t *rrd,
1030 char **updvals,
1031 rrd_value_t *pdp_new,
1032 double interval)
1033 {
1034 unsigned long ds_idx;
1035 int ii;
1036 char *endptr; /* used in the conversion */
1037 double rate;
1038 char *old_locale;
1039 enum dst_en dst_idx;
1041 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1042 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1044 /* make sure we do not build diffs with old last_ds values */
1045 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1046 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1047 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1048 }
1050 /* NOTE: DST_CDEF should never enter this if block, because
1051 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1052 * accidently specified a value for the DST_CDEF. To handle this case,
1053 * an extra check is required. */
1055 if ((updvals[ds_idx + 1][0] != 'U') &&
1056 (dst_idx != DST_CDEF) &&
1057 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1058 rate = DNAN;
1060 /* pdp_new contains rate * time ... eg the bytes transferred during
1061 * the interval. Doing it this way saves a lot of math operations
1062 */
1063 switch (dst_idx) {
1064 case DST_COUNTER:
1065 case DST_DERIVE:
1066 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1067 if ((updvals[ds_idx + 1][ii] < '0'
1068 || updvals[ds_idx + 1][ii] > '9')
1069 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1070 rrd_set_error("not a simple integer: '%s'",
1071 updvals[ds_idx + 1]);
1072 return -1;
1073 }
1074 }
1075 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1076 pdp_new[ds_idx] =
1077 rrd_diff(updvals[ds_idx + 1],
1078 rrd->pdp_prep[ds_idx].last_ds);
1079 if (dst_idx == DST_COUNTER) {
1080 /* simple overflow catcher. This will fail
1081 * terribly for non 32 or 64 bit counters
1082 * ... are there any others in SNMP land?
1083 */
1084 if (pdp_new[ds_idx] < (double) 0.0)
1085 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1086 if (pdp_new[ds_idx] < (double) 0.0)
1087 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1088 }
1089 rate = pdp_new[ds_idx] / interval;
1090 } else {
1091 pdp_new[ds_idx] = DNAN;
1092 }
1093 break;
1094 case DST_ABSOLUTE:
1095 old_locale = setlocale(LC_NUMERIC, "C");
1096 errno = 0;
1097 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1098 setlocale(LC_NUMERIC, old_locale);
1099 if (errno > 0) {
1100 rrd_set_error("converting '%s' to float: %s",
1101 updvals[ds_idx + 1], rrd_strerror(errno));
1102 return -1;
1103 };
1104 if (endptr[0] != '\0') {
1105 rrd_set_error
1106 ("conversion of '%s' to float not complete: tail '%s'",
1107 updvals[ds_idx + 1], endptr);
1108 return -1;
1109 }
1110 rate = pdp_new[ds_idx] / interval;
1111 break;
1112 case DST_GAUGE:
1113 errno = 0;
1114 old_locale = setlocale(LC_NUMERIC, "C");
1115 pdp_new[ds_idx] =
1116 strtod(updvals[ds_idx + 1], &endptr) * interval;
1117 setlocale(LC_NUMERIC, old_locale);
1118 if (errno) {
1119 rrd_set_error("converting '%s' to float: %s",
1120 updvals[ds_idx + 1], rrd_strerror(errno));
1121 return -1;
1122 };
1123 if (endptr[0] != '\0') {
1124 rrd_set_error
1125 ("conversion of '%s' to float not complete: tail '%s'",
1126 updvals[ds_idx + 1], endptr);
1127 return -1;
1128 }
1129 rate = pdp_new[ds_idx] / interval;
1130 break;
1131 default:
1132 rrd_set_error("rrd contains unknown DS type : '%s'",
1133 rrd->ds_def[ds_idx].dst);
1134 return -1;
1135 }
1136 /* break out of this for loop if the error string is set */
1137 if (rrd_test_error()) {
1138 return -1;
1139 }
1140 /* make sure pdp_temp is neither too large or too small
1141 * if any of these occur it becomes unknown ...
1142 * sorry folks ... */
1143 if (!isnan(rate) &&
1144 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1145 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1146 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1147 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1148 pdp_new[ds_idx] = DNAN;
1149 }
1150 } else {
1151 /* no news is news all the same */
1152 pdp_new[ds_idx] = DNAN;
1153 }
1156 /* make a copy of the command line argument for the next run */
1157 #ifdef DEBUG
1158 fprintf(stderr, "prep ds[%lu]\t"
1159 "last_arg '%s'\t"
1160 "this_arg '%s'\t"
1161 "pdp_new %10.2f\n",
1162 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1163 pdp_new[ds_idx]);
1164 #endif
1165 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1166 LAST_DS_LEN - 1);
1167 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1168 }
1169 return 0;
1170 }
1172 /*
1173 * How many PDP steps have elapsed since the last update? Returns the answer,
1174 * and stores the time between the last update and the last PDP in pre_time,
1175 * and the time between the last PDP and the current time in post_int.
1176 */
1177 static int calculate_elapsed_steps(
1178 rrd_t *rrd,
1179 unsigned long current_time,
1180 unsigned long current_time_usec,
1181 double interval,
1182 double *pre_int,
1183 double *post_int,
1184 unsigned long *proc_pdp_cnt)
1185 {
1186 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1187 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1188 * time */
1189 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1190 * when it was last updated */
1191 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1193 /* when was the current pdp started */
1194 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1195 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1197 /* when did the last pdp_st occur */
1198 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1199 occu_pdp_st = current_time - occu_pdp_age;
1201 if (occu_pdp_st > proc_pdp_st) {
1202 /* OK we passed the pdp_st moment */
1203 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1204 * occurred before the latest
1205 * pdp_st moment*/
1206 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1207 *post_int = occu_pdp_age; /* how much after it */
1208 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1209 } else {
1210 *pre_int = interval;
1211 *post_int = 0;
1212 }
1214 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1216 #ifdef DEBUG
1217 printf("proc_pdp_age %lu\t"
1218 "proc_pdp_st %lu\t"
1219 "occu_pfp_age %lu\t"
1220 "occu_pdp_st %lu\t"
1221 "int %lf\t"
1222 "pre_int %lf\t"
1223 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1224 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1225 #endif
1227 /* compute the number of elapsed pdp_st moments */
1228 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1229 }
1231 /*
1232 * Increment the PDP values by the values in pdp_new, or else initialize them.
1233 */
1234 static void simple_update(
1235 rrd_t *rrd,
1236 double interval,
1237 rrd_value_t *pdp_new)
1238 {
1239 int i;
1241 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1242 if (isnan(pdp_new[i])) {
1243 /* this is not really accurate if we use subsecond data arrival time
1244 should have thought of it when going subsecond resolution ...
1245 sorry next format change we will have it! */
1246 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1247 floor(interval);
1248 } else {
1249 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1250 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1251 } else {
1252 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1253 }
1254 }
1255 #ifdef DEBUG
1256 fprintf(stderr,
1257 "NO PDP ds[%i]\t"
1258 "value %10.2f\t"
1259 "unkn_sec %5lu\n",
1260 i,
1261 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1262 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1263 #endif
1264 }
1265 }
1267 /*
1268 * Call process_pdp_st for each DS.
1269 *
1270 * Returns 0 on success, -1 on error.
1271 */
1272 static int process_all_pdp_st(
1273 rrd_t *rrd,
1274 double interval,
1275 double pre_int,
1276 double post_int,
1277 unsigned long elapsed_pdp_st,
1278 rrd_value_t *pdp_new,
1279 rrd_value_t *pdp_temp)
1280 {
1281 unsigned long ds_idx;
1283 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1284 rate*seconds which occurred up to the last run.
1285 pdp_new[] contains rate*seconds from the latest run.
1286 pdp_temp[] will contain the rate for cdp */
1288 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1289 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1290 elapsed_pdp_st * rrd->stat_head->pdp_step,
1291 pdp_new, pdp_temp) == -1) {
1292 return -1;
1293 }
1294 #ifdef DEBUG
1295 fprintf(stderr, "PDP UPD ds[%lu]\t"
1296 "elapsed_pdp_st %lu\t"
1297 "pdp_temp %10.2f\t"
1298 "new_prep %10.2f\t"
1299 "new_unkn_sec %5lu\n",
1300 ds_idx,
1301 elapsed_pdp_st,
1302 pdp_temp[ds_idx],
1303 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1304 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1305 #endif
1306 }
1307 return 0;
1308 }
1310 /*
1311 * Process an update that occurs after one of the PDP moments.
1312 * Increments the PDP value, sets NAN if time greater than the
1313 * heartbeats have elapsed, processes CDEFs.
1314 *
1315 * Returns 0 on success, -1 on error.
1316 */
1317 static int process_pdp_st(
1318 rrd_t *rrd,
1319 unsigned long ds_idx,
1320 double interval,
1321 double pre_int,
1322 double post_int,
1323 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1324 rrd_value_t *pdp_new,
1325 rrd_value_t *pdp_temp)
1326 {
1327 int i;
1329 /* update pdp_prep to the current pdp_st. */
1330 double pre_unknown = 0.0;
1331 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1332 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1334 rpnstack_t rpnstack; /* used for COMPUTE DS */
1336 rpnstack_init(&rpnstack);
1339 if (isnan(pdp_new[ds_idx])) {
1340 /* a final bit of unknown to be added before calculation
1341 we use a temporary variable for this so that we
1342 don't have to turn integer lines before using the value */
1343 pre_unknown = pre_int;
1344 } else {
1345 if (isnan(scratch[PDP_val].u_val)) {
1346 scratch[PDP_val].u_val = 0;
1347 }
1348 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1349 }
1351 /* if too much of the pdp_prep is unknown we dump it */
1352 /* if the interval is larger thatn mrhb we get NAN */
1353 if ((interval > mrhb) ||
1354 (rrd->stat_head->pdp_step / 2.0 <
1355 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1356 pdp_temp[ds_idx] = DNAN;
1357 } else {
1358 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1359 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1360 pre_unknown);
1361 }
1363 /* process CDEF data sources; remember each CDEF DS can
1364 * only reference other DS with a lower index number */
1365 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1366 rpnp_t *rpnp;
1368 rpnp =
1369 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1370 /* substitute data values for OP_VARIABLE nodes */
1371 for (i = 0; rpnp[i].op != OP_END; i++) {
1372 if (rpnp[i].op == OP_VARIABLE) {
1373 rpnp[i].op = OP_NUMBER;
1374 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1375 }
1376 }
1377 /* run the rpn calculator */
1378 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1379 free(rpnp);
1380 rpnstack_free(&rpnstack);
1381 return -1;
1382 }
1383 }
1385 /* make pdp_prep ready for the next run */
1386 if (isnan(pdp_new[ds_idx])) {
1387 /* this is not realy accurate if we use subsecond data arival time
1388 should have thought of it when going subsecond resolution ...
1389 sorry next format change we will have it! */
1390 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1391 scratch[PDP_val].u_val = DNAN;
1392 } else {
1393 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1394 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1395 }
1396 rpnstack_free(&rpnstack);
1397 return 0;
1398 }
1400 /*
1401 * Iterate over all the RRAs for a given DS and:
1402 * 1. Decide whether to schedule a smooth later
1403 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1404 * 3. Update the CDP
1405 *
1406 * Returns 0 on success, -1 on error
1407 */
1408 static int update_all_cdp_prep(
1409 rrd_t *rrd,
1410 unsigned long *rra_step_cnt,
1411 unsigned long rra_begin,
1412 rrd_file_t *rrd_file,
1413 unsigned long elapsed_pdp_st,
1414 unsigned long proc_pdp_cnt,
1415 rrd_value_t **last_seasonal_coef,
1416 rrd_value_t **seasonal_coef,
1417 rrd_value_t *pdp_temp,
1418 unsigned long *skip_update,
1419 int *schedule_smooth)
1420 {
1421 unsigned long rra_idx;
1423 /* index into the CDP scratch array */
1424 enum cf_en current_cf;
1425 unsigned long rra_start;
1427 /* number of rows to be updated in an RRA for a data value. */
1428 unsigned long start_pdp_offset;
1430 rra_start = rra_begin;
1431 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1432 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1433 start_pdp_offset =
1434 rrd->rra_def[rra_idx].pdp_cnt -
1435 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1436 skip_update[rra_idx] = 0;
1437 if (start_pdp_offset <= elapsed_pdp_st) {
1438 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1439 rrd->rra_def[rra_idx].pdp_cnt + 1;
1440 } else {
1441 rra_step_cnt[rra_idx] = 0;
1442 }
1444 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1445 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1446 * so that they will be correct for the next observed value; note that for
1447 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1448 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1449 if (rra_step_cnt[rra_idx] > 1) {
1450 skip_update[rra_idx] = 1;
1451 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1452 elapsed_pdp_st, last_seasonal_coef);
1453 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1454 elapsed_pdp_st + 1, seasonal_coef);
1455 }
1456 /* periodically run a smoother for seasonal effects */
1457 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1458 #ifdef DEBUG
1459 fprintf(stderr,
1460 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1461 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1462 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1463 u_cnt);
1464 #endif
1465 *schedule_smooth = 1;
1466 }
1467 }
1468 if (rrd_test_error())
1469 return -1;
1471 if (update_cdp_prep
1472 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1473 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1474 current_cf) == -1) {
1475 return -1;
1476 }
1477 rra_start +=
1478 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1479 sizeof(rrd_value_t);
1480 }
1481 return 0;
1482 }
1484 /*
1485 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1486 */
1487 static int do_schedule_smooth(
1488 rrd_t *rrd,
1489 unsigned long rra_idx,
1490 unsigned long elapsed_pdp_st)
1491 {
1492 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1493 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1494 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1495 unsigned long seasonal_smooth_idx =
1496 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1497 unsigned long *init_seasonal =
1498 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1500 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1501 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1502 * really an RRA level, not a data source within RRA level parameter, but
1503 * the rra_def is read only for rrd_update (not flushed to disk). */
1504 if (*init_seasonal > BURNIN_CYCLES) {
1505 /* someone has no doubt invented a trick to deal with this wrap around,
1506 * but at least this code is clear. */
1507 if (seasonal_smooth_idx > cur_row) {
1508 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1509 * between PDP and CDP */
1510 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1511 }
1512 /* can't rely on negative numbers because we are working with
1513 * unsigned values */
1514 return (cur_row + elapsed_pdp_st >= row_cnt
1515 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1516 }
1517 /* mark off one of the burn-in cycles */
1518 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1519 }
1521 /*
1522 * For a given RRA, iterate over the data sources and call the appropriate
1523 * consolidation function.
1524 *
1525 * Returns 0 on success, -1 on error.
1526 */
1527 static int update_cdp_prep(
1528 rrd_t *rrd,
1529 unsigned long elapsed_pdp_st,
1530 unsigned long start_pdp_offset,
1531 unsigned long *rra_step_cnt,
1532 int rra_idx,
1533 rrd_value_t *pdp_temp,
1534 rrd_value_t *last_seasonal_coef,
1535 rrd_value_t *seasonal_coef,
1536 int current_cf)
1537 {
1538 unsigned long ds_idx, cdp_idx;
1540 /* update CDP_PREP areas */
1541 /* loop over data soures within each RRA */
1542 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1544 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1546 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1547 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1548 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1549 elapsed_pdp_st, start_pdp_offset,
1550 rrd->rra_def[rra_idx].pdp_cnt,
1551 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1552 rra_idx, ds_idx);
1553 } else {
1554 /* Nothing to consolidate if there's one PDP per CDP. However, if
1555 * we've missed some PDPs, let's update null counters etc. */
1556 if (elapsed_pdp_st > 2) {
1557 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1558 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1559 current_cf);
1560 }
1561 }
1563 if (rrd_test_error())
1564 return -1;
1565 } /* endif data sources loop */
1566 return 0;
1567 }
1569 /*
1570 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1571 * primary value, secondary value, and # of unknowns.
1572 */
1573 static void update_cdp(
1574 unival *scratch,
1575 int current_cf,
1576 rrd_value_t pdp_temp_val,
1577 unsigned long rra_step_cnt,
1578 unsigned long elapsed_pdp_st,
1579 unsigned long start_pdp_offset,
1580 unsigned long pdp_cnt,
1581 rrd_value_t xff,
1582 int i,
1583 int ii)
1584 {
1585 /* shorthand variables */
1586 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1587 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1588 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1589 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1591 if (rra_step_cnt) {
1592 /* If we are in this block, as least 1 CDP value will be written to
1593 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1594 * to be written, then the "fill in" value is the CDP_secondary_val
1595 * entry. */
1596 if (isnan(pdp_temp_val)) {
1597 *cdp_unkn_pdp_cnt += start_pdp_offset;
1598 *cdp_secondary_val = DNAN;
1599 } else {
1600 /* CDP_secondary value is the RRA "fill in" value for intermediary
1601 * CDP data entries. No matter the CF, the value is the same because
1602 * the average, max, min, and last of a list of identical values is
1603 * the same, namely, the value itself. */
1604 *cdp_secondary_val = pdp_temp_val;
1605 }
1607 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1608 *cdp_primary_val = DNAN;
1609 if (current_cf == CF_AVERAGE) {
1610 *cdp_val =
1611 initialize_average_carry_over(pdp_temp_val,
1612 elapsed_pdp_st,
1613 start_pdp_offset, pdp_cnt);
1614 } else {
1615 *cdp_val = pdp_temp_val;
1616 }
1617 } else {
1618 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1619 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1620 } /* endif meets xff value requirement for a valid value */
1621 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1622 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1623 if (isnan(pdp_temp_val))
1624 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1625 else
1626 *cdp_unkn_pdp_cnt = 0;
1627 } else { /* rra_step_cnt[i] == 0 */
1629 #ifdef DEBUG
1630 if (isnan(*cdp_val)) {
1631 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1632 i, ii);
1633 } else {
1634 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1635 i, ii, *cdp_val);
1636 }
1637 #endif
1638 if (isnan(pdp_temp_val)) {
1639 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1640 } else {
1641 *cdp_val =
1642 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1643 current_cf, i, ii);
1644 }
1645 }
1646 }
1648 /*
1649 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1650 * on the type of consolidation function.
1651 */
1652 static void initialize_cdp_val(
1653 unival *scratch,
1654 int current_cf,
1655 rrd_value_t pdp_temp_val,
1656 unsigned long elapsed_pdp_st,
1657 unsigned long start_pdp_offset,
1658 unsigned long pdp_cnt)
1659 {
1660 rrd_value_t cum_val, cur_val;
1662 switch (current_cf) {
1663 case CF_AVERAGE:
1664 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1665 cur_val = IFDNAN(pdp_temp_val, 0.0);
1666 scratch[CDP_primary_val].u_val =
1667 (cum_val + cur_val * start_pdp_offset) /
1668 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1669 scratch[CDP_val].u_val =
1670 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1671 start_pdp_offset, pdp_cnt);
1672 break;
1673 case CF_MAXIMUM:
1674 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1675 cur_val = IFDNAN(pdp_temp_val, -DINF);
1676 #if 0
1677 #ifdef DEBUG
1678 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1679 fprintf(stderr,
1680 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1681 i, ii);
1682 exit(-1);
1683 }
1684 #endif
1685 #endif
1686 if (cur_val > cum_val)
1687 scratch[CDP_primary_val].u_val = cur_val;
1688 else
1689 scratch[CDP_primary_val].u_val = cum_val;
1690 /* initialize carry over value */
1691 scratch[CDP_val].u_val = pdp_temp_val;
1692 break;
1693 case CF_MINIMUM:
1694 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1695 cur_val = IFDNAN(pdp_temp_val, DINF);
1696 #if 0
1697 #ifdef DEBUG
1698 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1699 fprintf(stderr,
1700 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1701 ii);
1702 exit(-1);
1703 }
1704 #endif
1705 #endif
1706 if (cur_val < cum_val)
1707 scratch[CDP_primary_val].u_val = cur_val;
1708 else
1709 scratch[CDP_primary_val].u_val = cum_val;
1710 /* initialize carry over value */
1711 scratch[CDP_val].u_val = pdp_temp_val;
1712 break;
1713 case CF_LAST:
1714 default:
1715 scratch[CDP_primary_val].u_val = pdp_temp_val;
1716 /* initialize carry over value */
1717 scratch[CDP_val].u_val = pdp_temp_val;
1718 break;
1719 }
1720 }
1722 /*
1723 * Update the consolidation function for Holt-Winters functions as
1724 * well as other functions that don't actually consolidate multiple
1725 * PDPs.
1726 */
1727 static void reset_cdp(
1728 rrd_t *rrd,
1729 unsigned long elapsed_pdp_st,
1730 rrd_value_t *pdp_temp,
1731 rrd_value_t *last_seasonal_coef,
1732 rrd_value_t *seasonal_coef,
1733 int rra_idx,
1734 int ds_idx,
1735 int cdp_idx,
1736 enum cf_en current_cf)
1737 {
1738 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1740 switch (current_cf) {
1741 case CF_AVERAGE:
1742 default:
1743 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1744 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1745 break;
1746 case CF_SEASONAL:
1747 case CF_DEVSEASONAL:
1748 /* need to update cached seasonal values, so they are consistent
1749 * with the bulk update */
1750 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1751 * CDP_last_deviation are the same. */
1752 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1753 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1754 break;
1755 case CF_HWPREDICT:
1756 case CF_MHWPREDICT:
1757 /* need to update the null_count and last_null_count.
1758 * even do this for non-DNAN pdp_temp because the
1759 * algorithm is not learning from batch updates. */
1760 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1761 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1762 /* fall through */
1763 case CF_DEVPREDICT:
1764 scratch[CDP_primary_val].u_val = DNAN;
1765 scratch[CDP_secondary_val].u_val = DNAN;
1766 break;
1767 case CF_FAILURES:
1768 /* do not count missed bulk values as failures */
1769 scratch[CDP_primary_val].u_val = 0;
1770 scratch[CDP_secondary_val].u_val = 0;
1771 /* need to reset violations buffer.
1772 * could do this more carefully, but for now, just
1773 * assume a bulk update wipes away all violations. */
1774 erase_violations(rrd, cdp_idx, rra_idx);
1775 break;
1776 }
1777 }
1779 static rrd_value_t initialize_average_carry_over(
1780 rrd_value_t pdp_temp_val,
1781 unsigned long elapsed_pdp_st,
1782 unsigned long start_pdp_offset,
1783 unsigned long pdp_cnt)
1784 {
1785 /* initialize carry over value */
1786 if (isnan(pdp_temp_val)) {
1787 return DNAN;
1788 }
1789 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1790 }
1792 /*
1793 * Update or initialize a CDP value based on the consolidation
1794 * function.
1795 *
1796 * Returns the new value.
1797 */
1798 static rrd_value_t calculate_cdp_val(
1799 rrd_value_t cdp_val,
1800 rrd_value_t pdp_temp_val,
1801 unsigned long elapsed_pdp_st,
1802 int current_cf,
1803 #ifdef DEBUG
1804 int i,
1805 int ii
1806 #else
1807 int UNUSED(i),
1808 int UNUSED(ii)
1809 #endif
1810 )
1811 {
1812 if (isnan(cdp_val)) {
1813 if (current_cf == CF_AVERAGE) {
1814 pdp_temp_val *= elapsed_pdp_st;
1815 }
1816 #ifdef DEBUG
1817 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1818 i, ii, pdp_temp_val);
1819 #endif
1820 return pdp_temp_val;
1821 }
1822 if (current_cf == CF_AVERAGE)
1823 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1824 if (current_cf == CF_MINIMUM)
1825 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1826 if (current_cf == CF_MAXIMUM)
1827 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1829 return pdp_temp_val;
1830 }
1832 /*
1833 * For each RRA, update the seasonal values and then call update_aberrant_CF
1834 * for each data source.
1835 *
1836 * Return 0 on success, -1 on error.
1837 */
1838 static int update_aberrant_cdps(
1839 rrd_t *rrd,
1840 rrd_file_t *rrd_file,
1841 unsigned long rra_begin,
1842 unsigned long elapsed_pdp_st,
1843 rrd_value_t *pdp_temp,
1844 rrd_value_t **seasonal_coef)
1845 {
1846 unsigned long rra_idx, ds_idx, j;
1848 /* number of PDP steps since the last update that
1849 * are assigned to the first CDP to be generated
1850 * since the last update. */
1851 unsigned short scratch_idx;
1852 unsigned long rra_start;
1853 enum cf_en current_cf;
1855 /* this loop is only entered if elapsed_pdp_st < 3 */
1856 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1857 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1858 rra_start = rra_begin;
1859 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1860 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1861 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1862 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1863 if (scratch_idx == CDP_primary_val) {
1864 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1865 elapsed_pdp_st + 1, seasonal_coef);
1866 } else {
1867 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1868 elapsed_pdp_st + 2, seasonal_coef);
1869 }
1870 }
1871 if (rrd_test_error())
1872 return -1;
1873 /* loop over data soures within each RRA */
1874 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1875 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1876 rra_idx * (rrd->stat_head->ds_cnt) +
1877 ds_idx, rra_idx, ds_idx, scratch_idx,
1878 *seasonal_coef);
1879 }
1880 }
1881 rra_start += rrd->rra_def[rra_idx].row_cnt
1882 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1883 }
1884 }
1885 return 0;
1886 }
1888 /*
1889 * Move sequentially through the file, writing one RRA at a time. Note this
1890 * architecture divorces the computation of CDP with flushing updated RRA
1891 * entries to disk.
1892 *
1893 * Return 0 on success, -1 on error.
1894 */
1895 static int write_to_rras(
1896 rrd_t *rrd,
1897 rrd_file_t *rrd_file,
1898 unsigned long *rra_step_cnt,
1899 unsigned long rra_begin,
1900 time_t current_time,
1901 unsigned long *skip_update,
1902 rrd_info_t ** pcdp_summary)
1903 {
1904 unsigned long rra_idx;
1905 unsigned long rra_start;
1906 time_t rra_time = 0; /* time of update for a RRA */
1908 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1910 /* Ready to write to disk */
1911 rra_start = rra_begin;
1913 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1914 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1915 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1917 /* for cdp_prep */
1918 unsigned short scratch_idx;
1919 unsigned long step_subtract;
1921 for (scratch_idx = CDP_primary_val,
1922 step_subtract = 1;
1923 rra_step_cnt[rra_idx] > 0;
1924 rra_step_cnt[rra_idx]--,
1925 scratch_idx = CDP_secondary_val,
1926 step_subtract = 2) {
1928 off_t rra_pos_new;
1929 #ifdef DEBUG
1930 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1931 #endif
1932 /* increment, with wrap-around */
1933 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1934 rra_ptr->cur_row = 0;
1936 /* we know what our position should be */
1937 rra_pos_new = rra_start
1938 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1940 /* re-seek if the position is wrong or we wrapped around */
1941 if (rra_pos_new != rrd_file->pos) {
1942 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1943 rrd_set_error("seek error in rrd");
1944 return -1;
1945 }
1946 }
1947 #ifdef DEBUG
1948 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1949 #endif
1951 if (skip_update[rra_idx])
1952 continue;
1954 if (*pcdp_summary != NULL) {
1955 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1957 rra_time = (current_time - current_time % step_time)
1958 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1959 }
1961 if (write_RRA_row
1962 (rrd_file, rrd, rra_idx, scratch_idx,
1963 pcdp_summary, rra_time) == -1)
1964 return -1;
1966 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1967 }
1969 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1970 } /* RRA LOOP */
1972 return 0;
1973 }
1975 /*
1976 * Write out one row of values (one value per DS) to the archive.
1977 *
1978 * Returns 0 on success, -1 on error.
1979 */
1980 static int write_RRA_row(
1981 rrd_file_t *rrd_file,
1982 rrd_t *rrd,
1983 unsigned long rra_idx,
1984 unsigned short CDP_scratch_idx,
1985 rrd_info_t ** pcdp_summary,
1986 time_t rra_time)
1987 {
1988 unsigned long ds_idx, cdp_idx;
1989 rrd_infoval_t iv;
1991 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1992 /* compute the cdp index */
1993 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1994 #ifdef DEBUG
1995 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1996 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1997 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1998 #endif
1999 if (*pcdp_summary != NULL) {
2000 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2001 /* append info to the return hash */
2002 *pcdp_summary = rrd_info_push(*pcdp_summary,
2003 sprintf_alloc
2004 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2005 rrd->rra_def[rra_idx].cf_nam,
2006 rrd->rra_def[rra_idx].pdp_cnt,
2007 rrd->ds_def[ds_idx].ds_nam),
2008 RD_I_VAL, iv);
2009 }
2010 if (rrd_write(rrd_file,
2011 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2012 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2013 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2014 return -1;
2015 }
2016 }
2017 return 0;
2018 }
2020 /*
2021 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2022 *
2023 * Returns 0 on success, -1 otherwise
2024 */
2025 static int smooth_all_rras(
2026 rrd_t *rrd,
2027 rrd_file_t *rrd_file,
2028 unsigned long rra_begin)
2029 {
2030 unsigned long rra_start = rra_begin;
2031 unsigned long rra_idx;
2033 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2034 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2035 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2036 #ifdef DEBUG
2037 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2038 #endif
2039 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2040 if (rrd_test_error())
2041 return -1;
2042 }
2043 rra_start += rrd->rra_def[rra_idx].row_cnt
2044 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2045 }
2046 return 0;
2047 }
2049 #ifndef HAVE_MMAP
2050 /*
2051 * Flush changes to disk (unless we're using mmap)
2052 *
2053 * Returns 0 on success, -1 otherwise
2054 */
2055 static int write_changes_to_disk(
2056 rrd_t *rrd,
2057 rrd_file_t *rrd_file,
2058 int version)
2059 {
2060 /* we just need to write back the live header portion now */
2061 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2062 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2063 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2064 SEEK_SET) != 0) {
2065 rrd_set_error("seek rrd for live header writeback");
2066 return -1;
2067 }
2068 if (version >= 3) {
2069 if (rrd_write(rrd_file, rrd->live_head,
2070 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2071 rrd_set_error("rrd_write live_head to rrd");
2072 return -1;
2073 }
2074 } else {
2075 if (rrd_write(rrd_file, rrd->legacy_last_up,
2076 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2077 rrd_set_error("rrd_write live_head to rrd");
2078 return -1;
2079 }
2080 }
2083 if (rrd_write(rrd_file, rrd->pdp_prep,
2084 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2085 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2086 rrd_set_error("rrd_write pdp_prep to rrd");
2087 return -1;
2088 }
2090 if (rrd_write(rrd_file, rrd->cdp_prep,
2091 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2092 rrd->stat_head->ds_cnt)
2093 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2094 rrd->stat_head->ds_cnt)) {
2096 rrd_set_error("rrd_write cdp_prep to rrd");
2097 return -1;
2098 }
2100 if (rrd_write(rrd_file, rrd->rra_ptr,
2101 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2102 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2103 rrd_set_error("rrd_write rra_ptr to rrd");
2104 return -1;
2105 }
2106 return 0;
2107 }
2108 #endif