2 /*****************************************************************************
3 * RRDtool 1.3.9 Copyright by Tobi Oetiker, 1997-2009
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #ifdef WIN32
21 #include <stdlib.h>
22 #endif
24 #include "rrd_hw.h"
25 #include "rrd_rpncalc.h"
27 #include "rrd_is_thread_safe.h"
28 #include "unused.h"
30 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 /*
32 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
33 * replacement.
34 */
35 #include <sys/timeb.h>
37 #ifndef __MINGW32__
38 struct timeval {
39 time_t tv_sec; /* seconds */
40 long tv_usec; /* microseconds */
41 };
42 #endif
44 struct __timezone {
45 int tz_minuteswest; /* minutes W of Greenwich */
46 int tz_dsttime; /* type of dst correction */
47 };
49 static int gettimeofday(
50 struct timeval *t,
51 struct __timezone *tz)
52 {
54 struct _timeb current_time;
56 _ftime(¤t_time);
58 t->tv_sec = current_time.time;
59 t->tv_usec = current_time.millitm * 1000;
61 return 0;
62 }
64 #endif
66 /* FUNCTION PROTOTYPES */
68 int rrd_update_r(
69 const char *filename,
70 const char *tmplt,
71 int argc,
72 const char **argv);
73 int _rrd_update(
74 const char *filename,
75 const char *tmplt,
76 int argc,
77 const char **argv,
78 rrd_info_t *);
80 static int allocate_data_structures(
81 rrd_t *rrd,
82 char ***updvals,
83 rrd_value_t **pdp_temp,
84 const char *tmplt,
85 long **tmpl_idx,
86 unsigned long *tmpl_cnt,
87 unsigned long **rra_step_cnt,
88 unsigned long **skip_update,
89 rrd_value_t **pdp_new);
91 static int parse_template(
92 rrd_t *rrd,
93 const char *tmplt,
94 unsigned long *tmpl_cnt,
95 long *tmpl_idx);
97 static int process_arg(
98 char *step_start,
99 rrd_t *rrd,
100 rrd_file_t *rrd_file,
101 unsigned long rra_begin,
102 time_t *current_time,
103 unsigned long *current_time_usec,
104 rrd_value_t *pdp_temp,
105 rrd_value_t *pdp_new,
106 unsigned long *rra_step_cnt,
107 char **updvals,
108 long *tmpl_idx,
109 unsigned long tmpl_cnt,
110 rrd_info_t ** pcdp_summary,
111 int version,
112 unsigned long *skip_update,
113 int *schedule_smooth);
115 static int parse_ds(
116 rrd_t *rrd,
117 char **updvals,
118 long *tmpl_idx,
119 char *input,
120 unsigned long tmpl_cnt,
121 time_t *current_time,
122 unsigned long *current_time_usec,
123 int version);
125 static int get_time_from_reading(
126 rrd_t *rrd,
127 char timesyntax,
128 char **updvals,
129 time_t *current_time,
130 unsigned long *current_time_usec,
131 int version);
133 static int update_pdp_prep(
134 rrd_t *rrd,
135 char **updvals,
136 rrd_value_t *pdp_new,
137 double interval);
139 static int calculate_elapsed_steps(
140 rrd_t *rrd,
141 unsigned long current_time,
142 unsigned long current_time_usec,
143 double interval,
144 double *pre_int,
145 double *post_int,
146 unsigned long *proc_pdp_cnt);
148 static void simple_update(
149 rrd_t *rrd,
150 double interval,
151 rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
154 rrd_t *rrd,
155 double interval,
156 double pre_int,
157 double post_int,
158 unsigned long elapsed_pdp_st,
159 rrd_value_t *pdp_new,
160 rrd_value_t *pdp_temp);
162 static int process_pdp_st(
163 rrd_t *rrd,
164 unsigned long ds_idx,
165 double interval,
166 double pre_int,
167 double post_int,
168 long diff_pdp_st,
169 rrd_value_t *pdp_new,
170 rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
173 rrd_t *rrd,
174 unsigned long *rra_step_cnt,
175 unsigned long rra_begin,
176 rrd_file_t *rrd_file,
177 unsigned long elapsed_pdp_st,
178 unsigned long proc_pdp_cnt,
179 rrd_value_t **last_seasonal_coef,
180 rrd_value_t **seasonal_coef,
181 rrd_value_t *pdp_temp,
182 unsigned long *skip_update,
183 int *schedule_smooth);
185 static int do_schedule_smooth(
186 rrd_t *rrd,
187 unsigned long rra_idx,
188 unsigned long elapsed_pdp_st);
190 static int update_cdp_prep(
191 rrd_t *rrd,
192 unsigned long elapsed_pdp_st,
193 unsigned long start_pdp_offset,
194 unsigned long *rra_step_cnt,
195 int rra_idx,
196 rrd_value_t *pdp_temp,
197 rrd_value_t *last_seasonal_coef,
198 rrd_value_t *seasonal_coef,
199 int current_cf);
201 static void update_cdp(
202 unival *scratch,
203 int current_cf,
204 rrd_value_t pdp_temp_val,
205 unsigned long rra_step_cnt,
206 unsigned long elapsed_pdp_st,
207 unsigned long start_pdp_offset,
208 unsigned long pdp_cnt,
209 rrd_value_t xff,
210 int i,
211 int ii);
213 static void initialize_cdp_val(
214 unival *scratch,
215 int current_cf,
216 rrd_value_t pdp_temp_val,
217 unsigned long elapsed_pdp_st,
218 unsigned long start_pdp_offset,
219 unsigned long pdp_cnt);
221 static void reset_cdp(
222 rrd_t *rrd,
223 unsigned long elapsed_pdp_st,
224 rrd_value_t *pdp_temp,
225 rrd_value_t *last_seasonal_coef,
226 rrd_value_t *seasonal_coef,
227 int rra_idx,
228 int ds_idx,
229 int cdp_idx,
230 enum cf_en current_cf);
232 static rrd_value_t initialize_average_carry_over(
233 rrd_value_t pdp_temp_val,
234 unsigned long elapsed_pdp_st,
235 unsigned long start_pdp_offset,
236 unsigned long pdp_cnt);
238 static rrd_value_t calculate_cdp_val(
239 rrd_value_t cdp_val,
240 rrd_value_t pdp_temp_val,
241 unsigned long elapsed_pdp_st,
242 int current_cf,
243 int i,
244 int ii);
246 static int update_aberrant_cdps(
247 rrd_t *rrd,
248 rrd_file_t *rrd_file,
249 unsigned long rra_begin,
250 unsigned long elapsed_pdp_st,
251 rrd_value_t *pdp_temp,
252 rrd_value_t **seasonal_coef);
254 static int write_to_rras(
255 rrd_t *rrd,
256 rrd_file_t *rrd_file,
257 unsigned long *rra_step_cnt,
258 unsigned long rra_begin,
259 time_t current_time,
260 unsigned long *skip_update,
261 rrd_info_t ** pcdp_summary);
263 static int write_RRA_row(
264 rrd_file_t *rrd_file,
265 rrd_t *rrd,
266 unsigned long rra_idx,
267 unsigned short CDP_scratch_idx,
268 rrd_info_t ** pcdp_summary,
269 time_t rra_time);
271 static int smooth_all_rras(
272 rrd_t *rrd,
273 rrd_file_t *rrd_file,
274 unsigned long rra_begin);
276 #ifndef HAVE_MMAP
277 static int write_changes_to_disk(
278 rrd_t *rrd,
279 rrd_file_t *rrd_file,
280 int version);
281 #endif
283 /*
284 * normalize time as returned by gettimeofday. usec part must
285 * be always >= 0
286 */
287 static inline void normalize_time(
288 struct timeval *t)
289 {
290 if (t->tv_usec < 0) {
291 t->tv_sec--;
292 t->tv_usec += 1e6L;
293 }
294 }
296 /*
297 * Sets current_time and current_time_usec based on the current time.
298 * current_time_usec is set to 0 if the version number is 1 or 2.
299 */
300 static inline void initialize_time(
301 time_t *current_time,
302 unsigned long *current_time_usec,
303 int version)
304 {
305 struct timeval tmp_time; /* used for time conversion */
307 gettimeofday(&tmp_time, 0);
308 normalize_time(&tmp_time);
309 *current_time = tmp_time.tv_sec;
310 if (version >= 3) {
311 *current_time_usec = tmp_time.tv_usec;
312 } else {
313 *current_time_usec = 0;
314 }
315 }
317 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319 rrd_info_t *rrd_update_v(
320 int argc,
321 char **argv)
322 {
323 char *tmplt = NULL;
324 rrd_info_t *result = NULL;
325 rrd_infoval_t rc;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
328 {0, 0, 0, 0}
329 };
331 rc.u_int = -1;
332 optind = 0;
333 opterr = 0; /* initialize getopt */
335 while (1) {
336 int option_index = 0;
337 int opt;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341 if (opt == EOF)
342 break;
344 switch (opt) {
345 case 't':
346 tmplt = optarg;
347 break;
349 case '?':
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
351 goto end_tag;
352 }
353 }
355 /* need at least 2 arguments: filename, data. */
356 if (argc - optind < 2) {
357 rrd_set_error("Not enough arguments");
358 goto end_tag;
359 }
360 rc.u_int = 0;
361 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
362 rc.u_int = _rrd_update(argv[optind], tmplt,
363 argc - optind - 1,
364 (const char **) (argv + optind + 1), result);
365 result->value.u_int = rc.u_int;
366 end_tag:
367 return result;
368 }
370 int rrd_update(
371 int argc,
372 char **argv)
373 {
374 struct option long_options[] = {
375 {"template", required_argument, 0, 't'},
376 {0, 0, 0, 0}
377 };
378 int option_index = 0;
379 int opt;
380 char *tmplt = NULL;
381 int rc = -1;
383 optind = 0;
384 opterr = 0; /* initialize getopt */
386 while (1) {
387 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389 if (opt == EOF)
390 break;
392 switch (opt) {
393 case 't':
394 tmplt = strdup(optarg);
395 break;
397 case '?':
398 rrd_set_error("unknown option '%s'", argv[optind - 1]);
399 goto out;
400 }
401 }
403 /* need at least 2 arguments: filename, data. */
404 if (argc - optind < 2) {
405 rrd_set_error("Not enough arguments");
406 goto out;
407 }
409 rc = rrd_update_r(argv[optind], tmplt,
410 argc - optind - 1, (const char **) (argv + optind + 1));
411 out:
412 free(tmplt);
413 return rc;
414 }
416 int rrd_update_r(
417 const char *filename,
418 const char *tmplt,
419 int argc,
420 const char **argv)
421 {
422 return _rrd_update(filename, tmplt, argc, argv, NULL);
423 }
425 int _rrd_update(
426 const char *filename,
427 const char *tmplt,
428 int argc,
429 const char **argv,
430 rrd_info_t * pcdp_summary)
431 {
433 int arg_i = 2;
435 unsigned long rra_begin; /* byte pointer to the rra
436 * area in the rrd file. this
437 * pointer never changes value */
438 rrd_value_t *pdp_new; /* prepare the incoming data to be added
439 * to the existing entry */
440 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
441 * to the cdp values */
443 long *tmpl_idx; /* index representing the settings
444 * transported by the tmplt index */
445 unsigned long tmpl_cnt = 2; /* time and data */
446 rrd_t rrd;
447 time_t current_time = 0;
448 unsigned long current_time_usec = 0; /* microseconds part of current time */
449 char **updvals;
450 int schedule_smooth = 0;
452 /* number of elapsed PDP steps since last update */
453 unsigned long *rra_step_cnt = NULL;
455 int version; /* rrd version */
456 rrd_file_t *rrd_file;
457 char *arg_copy; /* for processing the argv */
458 unsigned long *skip_update; /* RRAs to advance but not write */
460 /* need at least 1 arguments: data. */
461 if (argc < 1) {
462 rrd_set_error("Not enough arguments");
463 goto err_out;
464 }
466 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
467 goto err_free;
468 }
469 /* We are now at the beginning of the rra's */
470 rra_begin = rrd_file->header_len;
472 version = atoi(rrd.stat_head->version);
474 initialize_time(¤t_time, ¤t_time_usec, version);
476 /* get exclusive lock to whole file.
477 * lock gets removed when we close the file.
478 */
479 if (rrd_lock(rrd_file) != 0) {
480 rrd_set_error("could not lock RRD");
481 goto err_close;
482 }
484 if (allocate_data_structures(&rrd, &updvals,
485 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
486 &rra_step_cnt, &skip_update,
487 &pdp_new) == -1) {
488 goto err_close;
489 }
491 /* loop through the arguments. */
492 for (arg_i = 0; arg_i < argc; arg_i++) {
493 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
494 rrd_set_error("failed duplication argv entry");
495 break;
496 }
497 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
498 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
499 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
500 &pcdp_summary, version, skip_update,
501 &schedule_smooth) == -1) {
502 if (rrd_test_error()) { /* Should have error string always here */
503 char *save_error;
505 /* Prepend file name to error message */
506 if ((save_error = strdup(rrd_get_error())) != NULL) {
507 rrd_set_error("%s: %s", filename, save_error);
508 free(save_error);
509 }
510 }
511 free(arg_copy);
512 break;
513 }
514 free(arg_copy);
515 }
517 free(rra_step_cnt);
519 /* if we got here and if there is an error and if the file has not been
520 * written to, then close things up and return. */
521 if (rrd_test_error()) {
522 goto err_free_structures;
523 }
524 #ifndef HAVE_MMAP
525 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
526 goto err_free_structures;
527 }
528 #endif
530 /* calling the smoothing code here guarantees at most one smoothing
531 * operation per rrd_update call. Unfortunately, it is possible with bulk
532 * updates, or a long-delayed update for smoothing to occur off-schedule.
533 * This really isn't critical except during the burn-in cycles. */
534 if (schedule_smooth) {
535 smooth_all_rras(&rrd, rrd_file, rra_begin);
536 }
538 /* rrd_dontneed(rrd_file,&rrd); */
539 rrd_free(&rrd);
540 rrd_close(rrd_file);
542 free(pdp_new);
543 free(tmpl_idx);
544 free(pdp_temp);
545 free(skip_update);
546 free(updvals);
547 return 0;
549 err_free_structures:
550 free(pdp_new);
551 free(tmpl_idx);
552 free(pdp_temp);
553 free(skip_update);
554 free(updvals);
555 err_close:
556 rrd_close(rrd_file);
557 err_free:
558 rrd_free(&rrd);
559 err_out:
560 return -1;
561 }
563 /*
564 * get exclusive lock to whole file.
565 * lock gets removed when we close the file
566 *
567 * returns 0 on success
568 */
569 int rrd_lock(
570 rrd_file_t *file)
571 {
572 int rcstat;
574 {
575 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
576 struct _stat st;
578 if (_fstat(file->fd, &st) == 0) {
579 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
580 } else {
581 rcstat = -1;
582 }
583 #else
584 struct flock lock;
586 lock.l_type = F_WRLCK; /* exclusive write lock */
587 lock.l_len = 0; /* whole file */
588 lock.l_start = 0; /* start of file */
589 lock.l_whence = SEEK_SET; /* end of file */
591 rcstat = fcntl(file->fd, F_SETLK, &lock);
592 #endif
593 }
595 return (rcstat);
596 }
598 /*
599 * Allocate some important arrays used, and initialize the template.
600 *
601 * When it returns, either all of the structures are allocated
602 * or none of them are.
603 *
604 * Returns 0 on success, -1 on error.
605 */
606 static int allocate_data_structures(
607 rrd_t *rrd,
608 char ***updvals,
609 rrd_value_t **pdp_temp,
610 const char *tmplt,
611 long **tmpl_idx,
612 unsigned long *tmpl_cnt,
613 unsigned long **rra_step_cnt,
614 unsigned long **skip_update,
615 rrd_value_t **pdp_new)
616 {
617 unsigned i, ii;
618 if ((*updvals = (char **) malloc(sizeof(char *)
619 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
620 rrd_set_error("allocating updvals pointer array.");
621 return -1;
622 }
623 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
624 * rrd->stat_head->ds_cnt)) ==
625 NULL) {
626 rrd_set_error("allocating pdp_temp.");
627 goto err_free_updvals;
628 }
629 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
630 *
631 rrd->stat_head->rra_cnt)) ==
632 NULL) {
633 rrd_set_error("allocating skip_update.");
634 goto err_free_pdp_temp;
635 }
636 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
637 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
638 rrd_set_error("allocating tmpl_idx.");
639 goto err_free_skip_update;
640 }
641 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
642 *
643 (rrd->stat_head->
644 rra_cnt))) == NULL) {
645 rrd_set_error("allocating rra_step_cnt.");
646 goto err_free_tmpl_idx;
647 }
649 /* initialize tmplt redirector */
650 /* default config example (assume DS 1 is a CDEF DS)
651 tmpl_idx[0] -> 0; (time)
652 tmpl_idx[1] -> 1; (DS 0)
653 tmpl_idx[2] -> 3; (DS 2)
654 tmpl_idx[3] -> 4; (DS 3) */
655 (*tmpl_idx)[0] = 0; /* time */
656 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
657 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
658 (*tmpl_idx)[ii++] = i;
659 }
660 *tmpl_cnt = ii;
662 if (tmplt != NULL) {
663 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
664 goto err_free_rra_step_cnt;
665 }
666 }
668 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
669 * rrd->stat_head->ds_cnt)) == NULL) {
670 rrd_set_error("allocating pdp_new.");
671 goto err_free_rra_step_cnt;
672 }
674 return 0;
676 err_free_rra_step_cnt:
677 free(*rra_step_cnt);
678 err_free_tmpl_idx:
679 free(*tmpl_idx);
680 err_free_skip_update:
681 free(*skip_update);
682 err_free_pdp_temp:
683 free(*pdp_temp);
684 err_free_updvals:
685 free(*updvals);
686 return -1;
687 }
689 /*
690 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
691 *
692 * Returns 0 on success.
693 */
694 static int parse_template(
695 rrd_t *rrd,
696 const char *tmplt,
697 unsigned long *tmpl_cnt,
698 long *tmpl_idx)
699 {
700 char *dsname, *tmplt_copy;
701 unsigned int tmpl_len, i;
702 int ret = 0;
704 *tmpl_cnt = 1; /* the first entry is the time */
706 /* we should work on a writeable copy here */
707 if ((tmplt_copy = strdup(tmplt)) == NULL) {
708 rrd_set_error("error copying tmplt '%s'", tmplt);
709 ret = -1;
710 goto out;
711 }
713 dsname = tmplt_copy;
714 tmpl_len = strlen(tmplt_copy);
715 for (i = 0; i <= tmpl_len; i++) {
716 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
717 tmplt_copy[i] = '\0';
718 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
719 rrd_set_error("tmplt contains more DS definitions than RRD");
720 ret = -1;
721 goto out_free_tmpl_copy;
722 }
723 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
724 rrd_set_error("unknown DS name '%s'", dsname);
725 ret = -1;
726 goto out_free_tmpl_copy;
727 }
728 /* go to the next entry on the tmplt_copy */
729 if (i < tmpl_len)
730 dsname = &tmplt_copy[i + 1];
731 }
732 }
733 out_free_tmpl_copy:
734 free(tmplt_copy);
735 out:
736 return ret;
737 }
739 /*
740 * Parse an update string, updates the primary data points (PDPs)
741 * and consolidated data points (CDPs), and writes changes to the RRAs.
742 *
743 * Returns 0 on success, -1 on error.
744 */
745 static int process_arg(
746 char *step_start,
747 rrd_t *rrd,
748 rrd_file_t *rrd_file,
749 unsigned long rra_begin,
750 time_t *current_time,
751 unsigned long *current_time_usec,
752 rrd_value_t *pdp_temp,
753 rrd_value_t *pdp_new,
754 unsigned long *rra_step_cnt,
755 char **updvals,
756 long *tmpl_idx,
757 unsigned long tmpl_cnt,
758 rrd_info_t ** pcdp_summary,
759 int version,
760 unsigned long *skip_update,
761 int *schedule_smooth)
762 {
763 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
765 /* a vector of future Holt-Winters seasonal coefs */
766 unsigned long elapsed_pdp_st;
768 double interval, pre_int, post_int; /* interval between this and
769 * the last run */
770 unsigned long proc_pdp_cnt;
772 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
773 current_time, current_time_usec, version) == -1) {
774 return -1;
775 }
777 interval = (double) (*current_time - rrd->live_head->last_up)
778 + (double) ((long) *current_time_usec -
779 (long) rrd->live_head->last_up_usec) / 1e6f;
781 /* process the data sources and update the pdp_prep
782 * area accordingly */
783 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
784 return -1;
785 }
787 elapsed_pdp_st = calculate_elapsed_steps(rrd,
788 *current_time,
789 *current_time_usec, interval,
790 &pre_int, &post_int,
791 &proc_pdp_cnt);
793 /* has a pdp_st moment occurred since the last run ? */
794 if (elapsed_pdp_st == 0) {
795 /* no we have not passed a pdp_st moment. therefore update is simple */
796 simple_update(rrd, interval, pdp_new);
797 } else {
798 /* an pdp_st has occurred. */
799 if (process_all_pdp_st(rrd, interval,
800 pre_int, post_int,
801 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
802 return -1;
803 }
804 if (update_all_cdp_prep(rrd, rra_step_cnt,
805 rra_begin, rrd_file,
806 elapsed_pdp_st,
807 proc_pdp_cnt,
808 &last_seasonal_coef,
809 &seasonal_coef,
810 pdp_temp,
811 skip_update, schedule_smooth) == -1) {
812 goto err_free_coefficients;
813 }
814 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
815 elapsed_pdp_st, pdp_temp,
816 &seasonal_coef) == -1) {
817 goto err_free_coefficients;
818 }
819 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
820 *current_time, skip_update,
821 pcdp_summary) == -1) {
822 goto err_free_coefficients;
823 }
824 } /* endif a pdp_st has occurred */
825 rrd->live_head->last_up = *current_time;
826 rrd->live_head->last_up_usec = *current_time_usec;
828 if (version < 3) {
829 *rrd->legacy_last_up = rrd->live_head->last_up;
830 }
831 free(seasonal_coef);
832 free(last_seasonal_coef);
833 return 0;
835 err_free_coefficients:
836 free(seasonal_coef);
837 free(last_seasonal_coef);
838 return -1;
839 }
841 /*
842 * Parse a DS string (time + colon-separated values), storing the
843 * results in current_time, current_time_usec, and updvals.
844 *
845 * Returns 0 on success, -1 on error.
846 */
847 static int parse_ds(
848 rrd_t *rrd,
849 char **updvals,
850 long *tmpl_idx,
851 char *input,
852 unsigned long tmpl_cnt,
853 time_t *current_time,
854 unsigned long *current_time_usec,
855 int version)
856 {
857 char *p;
858 unsigned long i;
859 char timesyntax;
861 updvals[0] = input;
862 /* initialize all ds input to unknown except the first one
863 which has always got to be set */
864 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
865 updvals[i] = "U";
867 /* separate all ds elements; first must be examined separately
868 due to alternate time syntax */
869 if ((p = strchr(input, '@')) != NULL) {
870 timesyntax = '@';
871 } else if ((p = strchr(input, ':')) != NULL) {
872 timesyntax = ':';
873 } else {
874 rrd_set_error("expected timestamp not found in data source from %s",
875 input);
876 return -1;
877 }
878 *p = '\0';
879 i = 1;
880 updvals[tmpl_idx[i++]] = p + 1;
881 while (*(++p)) {
882 if (*p == ':') {
883 *p = '\0';
884 if (i < tmpl_cnt) {
885 updvals[tmpl_idx[i++]] = p + 1;
886 }
887 else {
888 rrd_set_error("found extra data on update argument: %s",p+1);
889 return -1;
890 }
891 }
892 }
894 if (i != tmpl_cnt) {
895 rrd_set_error("expected %lu data source readings (got %lu) from %s",
896 tmpl_cnt - 1, i - 1, input);
897 return -1;
898 }
900 if (get_time_from_reading(rrd, timesyntax, updvals,
901 current_time, current_time_usec,
902 version) == -1) {
903 return -1;
904 }
905 return 0;
906 }
908 /*
909 * Parse the time in a DS string, store it in current_time and
910 * current_time_usec and verify that it's later than the last
911 * update for this DS.
912 *
913 * Returns 0 on success, -1 on error.
914 */
915 static int get_time_from_reading(
916 rrd_t *rrd,
917 char timesyntax,
918 char **updvals,
919 time_t *current_time,
920 unsigned long *current_time_usec,
921 int version)
922 {
923 double tmp;
924 char *parsetime_error = NULL;
925 char *old_locale;
926 rrd_time_value_t ds_tv;
927 struct timeval tmp_time; /* used for time conversion */
929 /* get the time from the reading ... handle N */
930 if (timesyntax == '@') { /* at-style */
931 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
932 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
933 return -1;
934 }
935 if (ds_tv.type == RELATIVE_TO_END_TIME ||
936 ds_tv.type == RELATIVE_TO_START_TIME) {
937 rrd_set_error("specifying time relative to the 'start' "
938 "or 'end' makes no sense here: %s", updvals[0]);
939 return -1;
940 }
941 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
942 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
943 } else if (strcmp(updvals[0], "N") == 0) {
944 gettimeofday(&tmp_time, 0);
945 normalize_time(&tmp_time);
946 *current_time = tmp_time.tv_sec;
947 *current_time_usec = tmp_time.tv_usec;
948 } else {
949 old_locale = setlocale(LC_NUMERIC, NULL);
950 setlocale(LC_NUMERIC, "C");
951 errno = 0;
952 tmp = strtod(updvals[0], 0);
953 if (errno > 0) {
954 rrd_set_error("converting '%s' to float: %s",
955 updvals[0], rrd_strerror(errno));
956 return -1;
957 };
958 setlocale(LC_NUMERIC, old_locale);
959 if (tmp < 0.0){
960 gettimeofday(&tmp_time, 0);
961 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
962 }
964 *current_time = floor(tmp);
965 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
966 }
967 /* dont do any correction for old version RRDs */
968 if (version < 3)
969 *current_time_usec = 0;
971 if (*current_time < rrd->live_head->last_up ||
972 (*current_time == rrd->live_head->last_up &&
973 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
974 rrd_set_error("illegal attempt to update using time %ld when "
975 "last update time is %ld (minimum one second step)",
976 *current_time, rrd->live_head->last_up);
977 return -1;
978 }
979 return 0;
980 }
982 /*
983 * Update pdp_new by interpreting the updvals according to the DS type
984 * (COUNTER, GAUGE, etc.).
985 *
986 * Returns 0 on success, -1 on error.
987 */
988 static int update_pdp_prep(
989 rrd_t *rrd,
990 char **updvals,
991 rrd_value_t *pdp_new,
992 double interval)
993 {
994 unsigned long ds_idx;
995 int ii;
996 char *endptr; /* used in the conversion */
997 double rate;
998 char *old_locale;
999 enum dst_en dst_idx;
1001 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1002 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1004 /* make sure we do not build diffs with old last_ds values */
1005 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1006 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1007 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1008 }
1010 /* NOTE: DST_CDEF should never enter this if block, because
1011 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1012 * accidently specified a value for the DST_CDEF. To handle this case,
1013 * an extra check is required. */
1015 if ((updvals[ds_idx + 1][0] != 'U') &&
1016 (dst_idx != DST_CDEF) &&
1017 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1018 rate = DNAN;
1020 /* pdp_new contains rate * time ... eg the bytes transferred during
1021 * the interval. Doing it this way saves a lot of math operations
1022 */
1023 switch (dst_idx) {
1024 case DST_COUNTER:
1025 case DST_DERIVE:
1026 /* Check if this is a valid integer. `U' is already handled in
1027 * another branch. */
1028 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1029 if ((ii == 0) && (dst_idx == DST_DERIVE)
1030 && (updvals[ds_idx + 1][ii] == '-'))
1031 continue;
1033 if ((updvals[ds_idx + 1][ii] < '0')
1034 || (updvals[ds_idx + 1][ii] > '9')) {
1035 rrd_set_error("not a simple %s integer: '%s'",
1036 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1037 updvals[ds_idx + 1]);
1038 return -1;
1039 }
1040 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1042 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1043 pdp_new[ds_idx] =
1044 rrd_diff(updvals[ds_idx + 1],
1045 rrd->pdp_prep[ds_idx].last_ds);
1046 if (dst_idx == DST_COUNTER) {
1047 /* simple overflow catcher. This will fail
1048 * terribly for non 32 or 64 bit counters
1049 * ... are there any others in SNMP land?
1050 */
1051 if (pdp_new[ds_idx] < (double) 0.0)
1052 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1053 if (pdp_new[ds_idx] < (double) 0.0)
1054 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1055 }
1056 rate = pdp_new[ds_idx] / interval;
1057 } else {
1058 pdp_new[ds_idx] = DNAN;
1059 }
1060 break;
1061 case DST_ABSOLUTE:
1062 old_locale = setlocale(LC_NUMERIC, NULL);
1063 setlocale(LC_NUMERIC, "C");
1064 errno = 0;
1065 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1066 if (errno > 0) {
1067 rrd_set_error("converting '%s' to float: %s",
1068 updvals[ds_idx + 1], rrd_strerror(errno));
1069 return -1;
1070 };
1071 setlocale(LC_NUMERIC, old_locale);
1072 if (endptr[0] != '\0') {
1073 rrd_set_error
1074 ("conversion of '%s' to float not complete: tail '%s'",
1075 updvals[ds_idx + 1], endptr);
1076 return -1;
1077 }
1078 rate = pdp_new[ds_idx] / interval;
1079 break;
1080 case DST_GAUGE:
1081 old_locale = setlocale(LC_NUMERIC, NULL);
1082 setlocale(LC_NUMERIC, "C");
1083 errno = 0;
1084 pdp_new[ds_idx] =
1085 strtod(updvals[ds_idx + 1], &endptr) * interval;
1086 if (errno) {
1087 rrd_set_error("converting '%s' to float: %s",
1088 updvals[ds_idx + 1], rrd_strerror(errno));
1089 return -1;
1090 };
1091 setlocale(LC_NUMERIC, old_locale);
1092 if (endptr[0] != '\0') {
1093 rrd_set_error
1094 ("conversion of '%s' to float not complete: tail '%s'",
1095 updvals[ds_idx + 1], endptr);
1096 return -1;
1097 }
1098 rate = pdp_new[ds_idx] / interval;
1099 break;
1100 default:
1101 rrd_set_error("rrd contains unknown DS type : '%s'",
1102 rrd->ds_def[ds_idx].dst);
1103 return -1;
1104 }
1105 /* break out of this for loop if the error string is set */
1106 if (rrd_test_error()) {
1107 return -1;
1108 }
1109 /* make sure pdp_temp is neither too large or too small
1110 * if any of these occur it becomes unknown ...
1111 * sorry folks ... */
1112 if (!isnan(rate) &&
1113 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1114 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1115 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1116 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1117 pdp_new[ds_idx] = DNAN;
1118 }
1119 } else {
1120 /* no news is news all the same */
1121 pdp_new[ds_idx] = DNAN;
1122 }
1125 /* make a copy of the command line argument for the next run */
1126 #ifdef DEBUG
1127 fprintf(stderr, "prep ds[%lu]\t"
1128 "last_arg '%s'\t"
1129 "this_arg '%s'\t"
1130 "pdp_new %10.2f\n",
1131 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1132 pdp_new[ds_idx]);
1133 #endif
1134 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1135 LAST_DS_LEN - 1);
1136 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1137 }
1138 return 0;
1139 }
1141 /*
1142 * How many PDP steps have elapsed since the last update? Returns the answer,
1143 * and stores the time between the last update and the last PDP in pre_time,
1144 * and the time between the last PDP and the current time in post_int.
1145 */
1146 static int calculate_elapsed_steps(
1147 rrd_t *rrd,
1148 unsigned long current_time,
1149 unsigned long current_time_usec,
1150 double interval,
1151 double *pre_int,
1152 double *post_int,
1153 unsigned long *proc_pdp_cnt)
1154 {
1155 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1156 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1157 * time */
1158 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1159 * when it was last updated */
1160 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1162 /* when was the current pdp started */
1163 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1164 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1166 /* when did the last pdp_st occur */
1167 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1168 occu_pdp_st = current_time - occu_pdp_age;
1170 if (occu_pdp_st > proc_pdp_st) {
1171 /* OK we passed the pdp_st moment */
1172 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1173 * occurred before the latest
1174 * pdp_st moment*/
1175 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1176 *post_int = occu_pdp_age; /* how much after it */
1177 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1178 } else {
1179 *pre_int = interval;
1180 *post_int = 0;
1181 }
1183 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1185 #ifdef DEBUG
1186 printf("proc_pdp_age %lu\t"
1187 "proc_pdp_st %lu\t"
1188 "occu_pfp_age %lu\t"
1189 "occu_pdp_st %lu\t"
1190 "int %lf\t"
1191 "pre_int %lf\t"
1192 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1193 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1194 #endif
1196 /* compute the number of elapsed pdp_st moments */
1197 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1198 }
1200 /*
1201 * Increment the PDP values by the values in pdp_new, or else initialize them.
1202 */
1203 static void simple_update(
1204 rrd_t *rrd,
1205 double interval,
1206 rrd_value_t *pdp_new)
1207 {
1208 int i;
1210 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1211 if (isnan(pdp_new[i])) {
1212 /* this is not really accurate if we use subsecond data arrival time
1213 should have thought of it when going subsecond resolution ...
1214 sorry next format change we will have it! */
1215 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1216 floor(interval);
1217 } else {
1218 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1219 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1220 } else {
1221 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1222 }
1223 }
1224 #ifdef DEBUG
1225 fprintf(stderr,
1226 "NO PDP ds[%i]\t"
1227 "value %10.2f\t"
1228 "unkn_sec %5lu\n",
1229 i,
1230 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1231 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1232 #endif
1233 }
1234 }
1236 /*
1237 * Call process_pdp_st for each DS.
1238 *
1239 * Returns 0 on success, -1 on error.
1240 */
1241 static int process_all_pdp_st(
1242 rrd_t *rrd,
1243 double interval,
1244 double pre_int,
1245 double post_int,
1246 unsigned long elapsed_pdp_st,
1247 rrd_value_t *pdp_new,
1248 rrd_value_t *pdp_temp)
1249 {
1250 unsigned long ds_idx;
1252 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1253 rate*seconds which occurred up to the last run.
1254 pdp_new[] contains rate*seconds from the latest run.
1255 pdp_temp[] will contain the rate for cdp */
1257 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1258 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1259 elapsed_pdp_st * rrd->stat_head->pdp_step,
1260 pdp_new, pdp_temp) == -1) {
1261 return -1;
1262 }
1263 #ifdef DEBUG
1264 fprintf(stderr, "PDP UPD ds[%lu]\t"
1265 "elapsed_pdp_st %lu\t"
1266 "pdp_temp %10.2f\t"
1267 "new_prep %10.2f\t"
1268 "new_unkn_sec %5lu\n",
1269 ds_idx,
1270 elapsed_pdp_st,
1271 pdp_temp[ds_idx],
1272 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1273 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1274 #endif
1275 }
1276 return 0;
1277 }
1279 /*
1280 * Process an update that occurs after one of the PDP moments.
1281 * Increments the PDP value, sets NAN if time greater than the
1282 * heartbeats have elapsed, processes CDEFs.
1283 *
1284 * Returns 0 on success, -1 on error.
1285 */
1286 static int process_pdp_st(
1287 rrd_t *rrd,
1288 unsigned long ds_idx,
1289 double interval,
1290 double pre_int,
1291 double post_int,
1292 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1293 rrd_value_t *pdp_new,
1294 rrd_value_t *pdp_temp)
1295 {
1296 int i;
1298 /* update pdp_prep to the current pdp_st. */
1299 double pre_unknown = 0.0;
1300 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1301 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1303 rpnstack_t rpnstack; /* used for COMPUTE DS */
1305 rpnstack_init(&rpnstack);
1308 if (isnan(pdp_new[ds_idx])) {
1309 /* a final bit of unknown to be added before calculation
1310 we use a temporary variable for this so that we
1311 don't have to turn integer lines before using the value */
1312 pre_unknown = pre_int;
1313 } else {
1314 if (isnan(scratch[PDP_val].u_val)) {
1315 scratch[PDP_val].u_val = 0;
1316 }
1317 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1318 }
1320 /* if too much of the pdp_prep is unknown we dump it */
1321 /* if the interval is larger thatn mrhb we get NAN */
1322 if ((interval > mrhb) ||
1323 (rrd->stat_head->pdp_step / 2.0 <
1324 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1325 pdp_temp[ds_idx] = DNAN;
1326 } else {
1327 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1328 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1329 pre_unknown);
1330 }
1332 /* process CDEF data sources; remember each CDEF DS can
1333 * only reference other DS with a lower index number */
1334 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1335 rpnp_t *rpnp;
1337 rpnp =
1338 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1339 if(rpnp == NULL) {
1340 rpnstack_free(&rpnstack);
1341 return -1;
1342 }
1343 /* substitute data values for OP_VARIABLE nodes */
1344 for (i = 0; rpnp[i].op != OP_END; i++) {
1345 if (rpnp[i].op == OP_VARIABLE) {
1346 rpnp[i].op = OP_NUMBER;
1347 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1348 }
1349 }
1350 /* run the rpn calculator */
1351 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1352 free(rpnp);
1353 rpnstack_free(&rpnstack);
1354 return -1;
1355 }
1356 free(rpnp);
1357 }
1359 /* make pdp_prep ready for the next run */
1360 if (isnan(pdp_new[ds_idx])) {
1361 /* this is not realy accurate if we use subsecond data arival time
1362 should have thought of it when going subsecond resolution ...
1363 sorry next format change we will have it! */
1364 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1365 scratch[PDP_val].u_val = DNAN;
1366 } else {
1367 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1368 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1369 }
1370 rpnstack_free(&rpnstack);
1371 return 0;
1372 }
1374 /*
1375 * Iterate over all the RRAs for a given DS and:
1376 * 1. Decide whether to schedule a smooth later
1377 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1378 * 3. Update the CDP
1379 *
1380 * Returns 0 on success, -1 on error
1381 */
1382 static int update_all_cdp_prep(
1383 rrd_t *rrd,
1384 unsigned long *rra_step_cnt,
1385 unsigned long rra_begin,
1386 rrd_file_t *rrd_file,
1387 unsigned long elapsed_pdp_st,
1388 unsigned long proc_pdp_cnt,
1389 rrd_value_t **last_seasonal_coef,
1390 rrd_value_t **seasonal_coef,
1391 rrd_value_t *pdp_temp,
1392 unsigned long *skip_update,
1393 int *schedule_smooth)
1394 {
1395 unsigned long rra_idx;
1397 /* index into the CDP scratch array */
1398 enum cf_en current_cf;
1399 unsigned long rra_start;
1401 /* number of rows to be updated in an RRA for a data value. */
1402 unsigned long start_pdp_offset;
1404 rra_start = rra_begin;
1405 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1406 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1407 start_pdp_offset =
1408 rrd->rra_def[rra_idx].pdp_cnt -
1409 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1410 skip_update[rra_idx] = 0;
1411 if (start_pdp_offset <= elapsed_pdp_st) {
1412 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1413 rrd->rra_def[rra_idx].pdp_cnt + 1;
1414 } else {
1415 rra_step_cnt[rra_idx] = 0;
1416 }
1418 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1419 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1420 * so that they will be correct for the next observed value; note that for
1421 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1422 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1423 if (rra_step_cnt[rra_idx] > 1) {
1424 skip_update[rra_idx] = 1;
1425 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1426 elapsed_pdp_st, last_seasonal_coef);
1427 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1428 elapsed_pdp_st + 1, seasonal_coef);
1429 }
1430 /* periodically run a smoother for seasonal effects */
1431 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1432 #ifdef DEBUG
1433 fprintf(stderr,
1434 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1435 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1436 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1437 u_cnt);
1438 #endif
1439 *schedule_smooth = 1;
1440 }
1441 }
1442 if (rrd_test_error())
1443 return -1;
1445 if (update_cdp_prep
1446 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1447 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1448 current_cf) == -1) {
1449 return -1;
1450 }
1451 rra_start +=
1452 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1453 sizeof(rrd_value_t);
1454 }
1455 return 0;
1456 }
1458 /*
1459 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1460 */
1461 static int do_schedule_smooth(
1462 rrd_t *rrd,
1463 unsigned long rra_idx,
1464 unsigned long elapsed_pdp_st)
1465 {
1466 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1467 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1468 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1469 unsigned long seasonal_smooth_idx =
1470 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1471 unsigned long *init_seasonal =
1472 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1474 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1475 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1476 * really an RRA level, not a data source within RRA level parameter, but
1477 * the rra_def is read only for rrd_update (not flushed to disk). */
1478 if (*init_seasonal > BURNIN_CYCLES) {
1479 /* someone has no doubt invented a trick to deal with this wrap around,
1480 * but at least this code is clear. */
1481 if (seasonal_smooth_idx > cur_row) {
1482 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1483 * between PDP and CDP */
1484 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1485 }
1486 /* can't rely on negative numbers because we are working with
1487 * unsigned values */
1488 return (cur_row + elapsed_pdp_st >= row_cnt
1489 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1490 }
1491 /* mark off one of the burn-in cycles */
1492 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1493 }
1495 /*
1496 * For a given RRA, iterate over the data sources and call the appropriate
1497 * consolidation function.
1498 *
1499 * Returns 0 on success, -1 on error.
1500 */
1501 static int update_cdp_prep(
1502 rrd_t *rrd,
1503 unsigned long elapsed_pdp_st,
1504 unsigned long start_pdp_offset,
1505 unsigned long *rra_step_cnt,
1506 int rra_idx,
1507 rrd_value_t *pdp_temp,
1508 rrd_value_t *last_seasonal_coef,
1509 rrd_value_t *seasonal_coef,
1510 int current_cf)
1511 {
1512 unsigned long ds_idx, cdp_idx;
1514 /* update CDP_PREP areas */
1515 /* loop over data soures within each RRA */
1516 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1518 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1520 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1521 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1522 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1523 elapsed_pdp_st, start_pdp_offset,
1524 rrd->rra_def[rra_idx].pdp_cnt,
1525 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1526 rra_idx, ds_idx);
1527 } else {
1528 /* Nothing to consolidate if there's one PDP per CDP. However, if
1529 * we've missed some PDPs, let's update null counters etc. */
1530 if (elapsed_pdp_st > 2) {
1531 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1532 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1533 (enum cf_en)current_cf);
1534 }
1535 }
1537 if (rrd_test_error())
1538 return -1;
1539 } /* endif data sources loop */
1540 return 0;
1541 }
1543 /*
1544 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1545 * primary value, secondary value, and # of unknowns.
1546 */
1547 static void update_cdp(
1548 unival *scratch,
1549 int current_cf,
1550 rrd_value_t pdp_temp_val,
1551 unsigned long rra_step_cnt,
1552 unsigned long elapsed_pdp_st,
1553 unsigned long start_pdp_offset,
1554 unsigned long pdp_cnt,
1555 rrd_value_t xff,
1556 int i,
1557 int ii)
1558 {
1559 /* shorthand variables */
1560 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1561 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1562 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1563 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1565 if (rra_step_cnt) {
1566 /* If we are in this block, as least 1 CDP value will be written to
1567 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1568 * to be written, then the "fill in" value is the CDP_secondary_val
1569 * entry. */
1570 if (isnan(pdp_temp_val)) {
1571 *cdp_unkn_pdp_cnt += start_pdp_offset;
1572 *cdp_secondary_val = DNAN;
1573 } else {
1574 /* CDP_secondary value is the RRA "fill in" value for intermediary
1575 * CDP data entries. No matter the CF, the value is the same because
1576 * the average, max, min, and last of a list of identical values is
1577 * the same, namely, the value itself. */
1578 *cdp_secondary_val = pdp_temp_val;
1579 }
1581 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1582 *cdp_primary_val = DNAN;
1583 if (current_cf == CF_AVERAGE) {
1584 *cdp_val =
1585 initialize_average_carry_over(pdp_temp_val,
1586 elapsed_pdp_st,
1587 start_pdp_offset, pdp_cnt);
1588 } else {
1589 *cdp_val = pdp_temp_val;
1590 }
1591 } else {
1592 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1593 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1594 } /* endif meets xff value requirement for a valid value */
1595 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1596 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1597 if (isnan(pdp_temp_val))
1598 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1599 else
1600 *cdp_unkn_pdp_cnt = 0;
1601 } else { /* rra_step_cnt[i] == 0 */
1603 #ifdef DEBUG
1604 if (isnan(*cdp_val)) {
1605 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1606 i, ii);
1607 } else {
1608 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1609 i, ii, *cdp_val);
1610 }
1611 #endif
1612 if (isnan(pdp_temp_val)) {
1613 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1614 } else {
1615 *cdp_val =
1616 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1617 current_cf, i, ii);
1618 }
1619 }
1620 }
1622 /*
1623 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1624 * on the type of consolidation function.
1625 */
1626 static void initialize_cdp_val(
1627 unival *scratch,
1628 int current_cf,
1629 rrd_value_t pdp_temp_val,
1630 unsigned long elapsed_pdp_st,
1631 unsigned long start_pdp_offset,
1632 unsigned long pdp_cnt)
1633 {
1634 rrd_value_t cum_val, cur_val;
1636 switch (current_cf) {
1637 case CF_AVERAGE:
1638 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1639 cur_val = IFDNAN(pdp_temp_val, 0.0);
1640 scratch[CDP_primary_val].u_val =
1641 (cum_val + cur_val * start_pdp_offset) /
1642 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1643 scratch[CDP_val].u_val =
1644 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1645 start_pdp_offset, pdp_cnt);
1646 break;
1647 case CF_MAXIMUM:
1648 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1649 cur_val = IFDNAN(pdp_temp_val, -DINF);
1650 #if 0
1651 #ifdef DEBUG
1652 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1653 fprintf(stderr,
1654 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1655 i, ii);
1656 exit(-1);
1657 }
1658 #endif
1659 #endif
1660 if (cur_val > cum_val)
1661 scratch[CDP_primary_val].u_val = cur_val;
1662 else
1663 scratch[CDP_primary_val].u_val = cum_val;
1664 /* initialize carry over value */
1665 scratch[CDP_val].u_val = pdp_temp_val;
1666 break;
1667 case CF_MINIMUM:
1668 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1669 cur_val = IFDNAN(pdp_temp_val, DINF);
1670 #if 0
1671 #ifdef DEBUG
1672 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1673 fprintf(stderr,
1674 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1675 ii);
1676 exit(-1);
1677 }
1678 #endif
1679 #endif
1680 if (cur_val < cum_val)
1681 scratch[CDP_primary_val].u_val = cur_val;
1682 else
1683 scratch[CDP_primary_val].u_val = cum_val;
1684 /* initialize carry over value */
1685 scratch[CDP_val].u_val = pdp_temp_val;
1686 break;
1687 case CF_LAST:
1688 default:
1689 scratch[CDP_primary_val].u_val = pdp_temp_val;
1690 /* initialize carry over value */
1691 scratch[CDP_val].u_val = pdp_temp_val;
1692 break;
1693 }
1694 }
1696 /*
1697 * Update the consolidation function for Holt-Winters functions as
1698 * well as other functions that don't actually consolidate multiple
1699 * PDPs.
1700 */
1701 static void reset_cdp(
1702 rrd_t *rrd,
1703 unsigned long elapsed_pdp_st,
1704 rrd_value_t *pdp_temp,
1705 rrd_value_t *last_seasonal_coef,
1706 rrd_value_t *seasonal_coef,
1707 int rra_idx,
1708 int ds_idx,
1709 int cdp_idx,
1710 enum cf_en current_cf)
1711 {
1712 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1714 switch (current_cf) {
1715 case CF_AVERAGE:
1716 default:
1717 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1718 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1719 break;
1720 case CF_SEASONAL:
1721 case CF_DEVSEASONAL:
1722 /* need to update cached seasonal values, so they are consistent
1723 * with the bulk update */
1724 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1725 * CDP_last_deviation are the same. */
1726 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1727 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1728 break;
1729 case CF_HWPREDICT:
1730 case CF_MHWPREDICT:
1731 /* need to update the null_count and last_null_count.
1732 * even do this for non-DNAN pdp_temp because the
1733 * algorithm is not learning from batch updates. */
1734 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1735 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1736 /* fall through */
1737 case CF_DEVPREDICT:
1738 scratch[CDP_primary_val].u_val = DNAN;
1739 scratch[CDP_secondary_val].u_val = DNAN;
1740 break;
1741 case CF_FAILURES:
1742 /* do not count missed bulk values as failures */
1743 scratch[CDP_primary_val].u_val = 0;
1744 scratch[CDP_secondary_val].u_val = 0;
1745 /* need to reset violations buffer.
1746 * could do this more carefully, but for now, just
1747 * assume a bulk update wipes away all violations. */
1748 erase_violations(rrd, cdp_idx, rra_idx);
1749 break;
1750 }
1751 }
1753 static rrd_value_t initialize_average_carry_over(
1754 rrd_value_t pdp_temp_val,
1755 unsigned long elapsed_pdp_st,
1756 unsigned long start_pdp_offset,
1757 unsigned long pdp_cnt)
1758 {
1759 /* initialize carry over value */
1760 if (isnan(pdp_temp_val)) {
1761 return DNAN;
1762 }
1763 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1764 }
1766 /*
1767 * Update or initialize a CDP value based on the consolidation
1768 * function.
1769 *
1770 * Returns the new value.
1771 */
1772 static rrd_value_t calculate_cdp_val(
1773 rrd_value_t cdp_val,
1774 rrd_value_t pdp_temp_val,
1775 unsigned long elapsed_pdp_st,
1776 int current_cf,
1777 #ifdef DEBUG
1778 int i,
1779 int ii
1780 #else
1781 int UNUSED(i),
1782 int UNUSED(ii)
1783 #endif
1784 )
1785 {
1786 if (isnan(cdp_val)) {
1787 if (current_cf == CF_AVERAGE) {
1788 pdp_temp_val *= elapsed_pdp_st;
1789 }
1790 #ifdef DEBUG
1791 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1792 i, ii, pdp_temp_val);
1793 #endif
1794 return pdp_temp_val;
1795 }
1796 if (current_cf == CF_AVERAGE)
1797 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1798 if (current_cf == CF_MINIMUM)
1799 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1800 if (current_cf == CF_MAXIMUM)
1801 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1803 return pdp_temp_val;
1804 }
1806 /*
1807 * For each RRA, update the seasonal values and then call update_aberrant_CF
1808 * for each data source.
1809 *
1810 * Return 0 on success, -1 on error.
1811 */
1812 static int update_aberrant_cdps(
1813 rrd_t *rrd,
1814 rrd_file_t *rrd_file,
1815 unsigned long rra_begin,
1816 unsigned long elapsed_pdp_st,
1817 rrd_value_t *pdp_temp,
1818 rrd_value_t **seasonal_coef)
1819 {
1820 unsigned long rra_idx, ds_idx, j;
1822 /* number of PDP steps since the last update that
1823 * are assigned to the first CDP to be generated
1824 * since the last update. */
1825 unsigned short scratch_idx;
1826 unsigned long rra_start;
1827 enum cf_en current_cf;
1829 /* this loop is only entered if elapsed_pdp_st < 3 */
1830 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1831 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1832 rra_start = rra_begin;
1833 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1834 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1835 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1836 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1837 if (scratch_idx == CDP_primary_val) {
1838 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1839 elapsed_pdp_st + 1, seasonal_coef);
1840 } else {
1841 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1842 elapsed_pdp_st + 2, seasonal_coef);
1843 }
1844 }
1845 if (rrd_test_error())
1846 return -1;
1847 /* loop over data soures within each RRA */
1848 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1849 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1850 rra_idx * (rrd->stat_head->ds_cnt) +
1851 ds_idx, rra_idx, ds_idx, scratch_idx,
1852 *seasonal_coef);
1853 }
1854 }
1855 rra_start += rrd->rra_def[rra_idx].row_cnt
1856 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1857 }
1858 }
1859 return 0;
1860 }
1862 /*
1863 * Move sequentially through the file, writing one RRA at a time. Note this
1864 * architecture divorces the computation of CDP with flushing updated RRA
1865 * entries to disk.
1866 *
1867 * Return 0 on success, -1 on error.
1868 */
1869 static int write_to_rras(
1870 rrd_t *rrd,
1871 rrd_file_t *rrd_file,
1872 unsigned long *rra_step_cnt,
1873 unsigned long rra_begin,
1874 time_t current_time,
1875 unsigned long *skip_update,
1876 rrd_info_t ** pcdp_summary)
1877 {
1878 unsigned long rra_idx;
1879 unsigned long rra_start;
1880 time_t rra_time = 0; /* time of update for a RRA */
1882 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1884 /* Ready to write to disk */
1885 rra_start = rra_begin;
1887 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1888 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1889 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1891 /* for cdp_prep */
1892 unsigned short scratch_idx;
1893 unsigned long step_subtract;
1895 for (scratch_idx = CDP_primary_val,
1896 step_subtract = 1;
1897 rra_step_cnt[rra_idx] > 0;
1898 rra_step_cnt[rra_idx]--,
1899 scratch_idx = CDP_secondary_val,
1900 step_subtract = 2) {
1902 size_t rra_pos_new;
1903 #ifdef DEBUG
1904 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1905 #endif
1906 /* increment, with wrap-around */
1907 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1908 rra_ptr->cur_row = 0;
1910 /* we know what our position should be */
1911 rra_pos_new = rra_start
1912 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1914 /* re-seek if the position is wrong or we wrapped around */
1915 if (rra_pos_new != rrd_file->pos) {
1916 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1917 rrd_set_error("seek error in rrd");
1918 return -1;
1919 }
1920 }
1921 #ifdef DEBUG
1922 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1923 #endif
1925 if (skip_update[rra_idx])
1926 continue;
1928 if (*pcdp_summary != NULL) {
1929 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1931 rra_time = (current_time - current_time % step_time)
1932 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1933 }
1935 if (write_RRA_row
1936 (rrd_file, rrd, rra_idx, scratch_idx,
1937 pcdp_summary, rra_time) == -1)
1938 return -1;
1939 }
1941 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1942 } /* RRA LOOP */
1944 return 0;
1945 }
1947 /*
1948 * Write out one row of values (one value per DS) to the archive.
1949 *
1950 * Returns 0 on success, -1 on error.
1951 */
1952 static int write_RRA_row(
1953 rrd_file_t *rrd_file,
1954 rrd_t *rrd,
1955 unsigned long rra_idx,
1956 unsigned short CDP_scratch_idx,
1957 rrd_info_t ** pcdp_summary,
1958 time_t rra_time)
1959 {
1960 unsigned long ds_idx, cdp_idx;
1961 rrd_infoval_t iv;
1963 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1964 /* compute the cdp index */
1965 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1966 #ifdef DEBUG
1967 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1968 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1969 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1970 #endif
1971 if (*pcdp_summary != NULL) {
1972 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1973 /* append info to the return hash */
1974 *pcdp_summary = rrd_info_push(*pcdp_summary,
1975 sprintf_alloc
1976 ("[%lli]RRA[%s][%lu]DS[%s]", (long long)rra_time,
1977 rrd->rra_def[rra_idx].cf_nam,
1978 rrd->rra_def[rra_idx].pdp_cnt,
1979 rrd->ds_def[ds_idx].ds_nam),
1980 RD_I_VAL, iv);
1981 }
1982 errno = 0;
1983 if (rrd_write(rrd_file,
1984 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1985 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1986 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1987 return -1;
1988 }
1989 }
1990 return 0;
1991 }
1993 /*
1994 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1995 *
1996 * Returns 0 on success, -1 otherwise
1997 */
1998 static int smooth_all_rras(
1999 rrd_t *rrd,
2000 rrd_file_t *rrd_file,
2001 unsigned long rra_begin)
2002 {
2003 unsigned long rra_start = rra_begin;
2004 unsigned long rra_idx;
2006 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2007 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2008 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2009 #ifdef DEBUG
2010 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2011 #endif
2012 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2013 if (rrd_test_error())
2014 return -1;
2015 }
2016 rra_start += rrd->rra_def[rra_idx].row_cnt
2017 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2018 }
2019 return 0;
2020 }
2022 #ifndef HAVE_MMAP
2023 /*
2024 * Flush changes to disk (unless we're using mmap)
2025 *
2026 * Returns 0 on success, -1 otherwise
2027 */
2028 static int write_changes_to_disk(
2029 rrd_t *rrd,
2030 rrd_file_t *rrd_file,
2031 int version)
2032 {
2033 /* we just need to write back the live header portion now */
2034 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2035 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2036 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2037 SEEK_SET) != 0) {
2038 rrd_set_error("seek rrd for live header writeback");
2039 return -1;
2040 }
2041 if (version >= 3) {
2042 if (rrd_write(rrd_file, rrd->live_head,
2043 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2044 rrd_set_error("rrd_write live_head to rrd");
2045 return -1;
2046 }
2047 } else {
2048 if (rrd_write(rrd_file, rrd->legacy_last_up,
2049 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2050 rrd_set_error("rrd_write live_head to rrd");
2051 return -1;
2052 }
2053 }
2056 if (rrd_write(rrd_file, rrd->pdp_prep,
2057 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2058 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2059 rrd_set_error("rrd_write pdp_prep to rrd");
2060 return -1;
2061 }
2063 if (rrd_write(rrd_file, rrd->cdp_prep,
2064 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2065 rrd->stat_head->ds_cnt)
2066 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2067 rrd->stat_head->ds_cnt)) {
2069 rrd_set_error("rrd_write cdp_prep to rrd");
2070 return -1;
2071 }
2073 if (rrd_write(rrd_file, rrd->rra_ptr,
2074 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2075 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2076 rrd_set_error("rrd_write rra_ptr to rrd");
2077 return -1;
2078 }
2079 return 0;
2080 }
2081 #endif