2 /*****************************************************************************
3 * RRDtool 1.3.1 Copyright by Tobi Oetiker, 1997-2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id: rrd_update.c 1447 2008-07-23 13:02:26Z oetiker $
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
29 * replacement.
30 */
31 #include <sys/timeb.h>
33 #ifndef __MINGW32__
34 struct timeval {
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
37 };
38 #endif
40 struct __timezone {
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
43 };
45 static int gettimeofday(
46 struct timeval *t,
47 struct __timezone *tz)
48 {
50 struct _timeb current_time;
52 _ftime(¤t_time);
54 t->tv_sec = current_time.time;
55 t->tv_usec = current_time.millitm * 1000;
57 return 0;
58 }
60 #endif
62 /* FUNCTION PROTOTYPES */
64 int rrd_update_r(
65 const char *filename,
66 const char *tmplt,
67 int argc,
68 const char **argv);
69 int _rrd_update(
70 const char *filename,
71 const char *tmplt,
72 int argc,
73 const char **argv,
74 rrd_info_t *);
76 static int allocate_data_structures(
77 rrd_t *rrd,
78 char ***updvals,
79 rrd_value_t **pdp_temp,
80 const char *tmplt,
81 long **tmpl_idx,
82 unsigned long *tmpl_cnt,
83 unsigned long **rra_step_cnt,
84 unsigned long **skip_update,
85 rrd_value_t **pdp_new);
87 static int parse_template(
88 rrd_t *rrd,
89 const char *tmplt,
90 unsigned long *tmpl_cnt,
91 long *tmpl_idx);
93 static int process_arg(
94 char *step_start,
95 rrd_t *rrd,
96 rrd_file_t *rrd_file,
97 unsigned long rra_begin,
98 unsigned long *rra_current,
99 time_t *current_time,
100 unsigned long *current_time_usec,
101 rrd_value_t *pdp_temp,
102 rrd_value_t *pdp_new,
103 unsigned long *rra_step_cnt,
104 char **updvals,
105 long *tmpl_idx,
106 unsigned long tmpl_cnt,
107 rrd_info_t ** pcdp_summary,
108 int version,
109 unsigned long *skip_update,
110 int *schedule_smooth);
112 static int parse_ds(
113 rrd_t *rrd,
114 char **updvals,
115 long *tmpl_idx,
116 char *input,
117 unsigned long tmpl_cnt,
118 time_t *current_time,
119 unsigned long *current_time_usec,
120 int version);
122 static int get_time_from_reading(
123 rrd_t *rrd,
124 char timesyntax,
125 char **updvals,
126 time_t *current_time,
127 unsigned long *current_time_usec,
128 int version);
130 static int update_pdp_prep(
131 rrd_t *rrd,
132 char **updvals,
133 rrd_value_t *pdp_new,
134 double interval);
136 static int calculate_elapsed_steps(
137 rrd_t *rrd,
138 unsigned long current_time,
139 unsigned long current_time_usec,
140 double interval,
141 double *pre_int,
142 double *post_int,
143 unsigned long *proc_pdp_cnt);
145 static void simple_update(
146 rrd_t *rrd,
147 double interval,
148 rrd_value_t *pdp_new);
150 static int process_all_pdp_st(
151 rrd_t *rrd,
152 double interval,
153 double pre_int,
154 double post_int,
155 unsigned long elapsed_pdp_st,
156 rrd_value_t *pdp_new,
157 rrd_value_t *pdp_temp);
159 static int process_pdp_st(
160 rrd_t *rrd,
161 unsigned long ds_idx,
162 double interval,
163 double pre_int,
164 double post_int,
165 long diff_pdp_st,
166 rrd_value_t *pdp_new,
167 rrd_value_t *pdp_temp);
169 static int update_all_cdp_prep(
170 rrd_t *rrd,
171 unsigned long *rra_step_cnt,
172 unsigned long rra_begin,
173 rrd_file_t *rrd_file,
174 unsigned long elapsed_pdp_st,
175 unsigned long proc_pdp_cnt,
176 rrd_value_t **last_seasonal_coef,
177 rrd_value_t **seasonal_coef,
178 rrd_value_t *pdp_temp,
179 unsigned long *rra_current,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long elapsed_pdp_st,
216 unsigned long start_pdp_offset,
217 unsigned long pdp_cnt);
219 static void reset_cdp(
220 rrd_t *rrd,
221 unsigned long elapsed_pdp_st,
222 rrd_value_t *pdp_temp,
223 rrd_value_t *last_seasonal_coef,
224 rrd_value_t *seasonal_coef,
225 int rra_idx,
226 int ds_idx,
227 int cdp_idx,
228 enum cf_en current_cf);
230 static rrd_value_t initialize_average_carry_over(
231 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long *rra_current,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
254 rrd_t *rrd,
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
258 unsigned long *rra_current,
259 time_t current_time,
260 unsigned long *skip_update,
261 rrd_info_t ** pcdp_summary);
263 static int write_RRA_row(
264 rrd_file_t *rrd_file,
265 rrd_t *rrd,
266 unsigned long rra_idx,
267 unsigned long *rra_current,
268 unsigned short CDP_scratch_idx,
269 rrd_info_t ** pcdp_summary,
270 time_t rra_time);
272 static int smooth_all_rras(
273 rrd_t *rrd,
274 rrd_file_t *rrd_file,
275 unsigned long rra_begin);
277 #ifndef HAVE_MMAP
278 static int write_changes_to_disk(
279 rrd_t *rrd,
280 rrd_file_t *rrd_file,
281 int version);
282 #endif
284 /*
285 * normalize time as returned by gettimeofday. usec part must
286 * be always >= 0
287 */
288 static inline void normalize_time(
289 struct timeval *t)
290 {
291 if (t->tv_usec < 0) {
292 t->tv_sec--;
293 t->tv_usec += 1e6L;
294 }
295 }
297 /*
298 * Sets current_time and current_time_usec based on the current time.
299 * current_time_usec is set to 0 if the version number is 1 or 2.
300 */
301 static inline void initialize_time(
302 time_t *current_time,
303 unsigned long *current_time_usec,
304 int version)
305 {
306 struct timeval tmp_time; /* used for time conversion */
308 gettimeofday(&tmp_time, 0);
309 normalize_time(&tmp_time);
310 *current_time = tmp_time.tv_sec;
311 if (version >= 3) {
312 *current_time_usec = tmp_time.tv_usec;
313 } else {
314 *current_time_usec = 0;
315 }
316 }
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
320 rrd_info_t *rrd_update_v(
321 int argc,
322 char **argv)
323 {
324 char *tmplt = NULL;
325 rrd_info_t *result = NULL;
326 rrd_infoval_t rc;
327 struct option long_options[] = {
328 {"template", required_argument, 0, 't'},
329 {0, 0, 0, 0}
330 };
332 rc.u_int = -1;
333 optind = 0;
334 opterr = 0; /* initialize getopt */
336 while (1) {
337 int option_index = 0;
338 int opt;
340 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
342 if (opt == EOF)
343 break;
345 switch (opt) {
346 case 't':
347 tmplt = optarg;
348 break;
350 case '?':
351 rrd_set_error("unknown option '%s'", argv[optind - 1]);
352 goto end_tag;
353 }
354 }
356 /* need at least 2 arguments: filename, data. */
357 if (argc - optind < 2) {
358 rrd_set_error("Not enough arguments");
359 goto end_tag;
360 }
361 rc.u_int = 0;
362 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363 rc.u_int = _rrd_update(argv[optind], tmplt,
364 argc - optind - 1,
365 (const char **) (argv + optind + 1), result);
366 result->value.u_int = rc.u_int;
367 end_tag:
368 return result;
369 }
371 int rrd_update(
372 int argc,
373 char **argv)
374 {
375 struct option long_options[] = {
376 {"template", required_argument, 0, 't'},
377 {0, 0, 0, 0}
378 };
379 int option_index = 0;
380 int opt;
381 char *tmplt = NULL;
382 int rc = -1;
384 optind = 0;
385 opterr = 0; /* initialize getopt */
387 while (1) {
388 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
390 if (opt == EOF)
391 break;
393 switch (opt) {
394 case 't':
395 tmplt = strdup(optarg);
396 break;
398 case '?':
399 rrd_set_error("unknown option '%s'", argv[optind - 1]);
400 goto out;
401 }
402 }
404 /* need at least 2 arguments: filename, data. */
405 if (argc - optind < 2) {
406 rrd_set_error("Not enough arguments");
407 goto out;
408 }
410 rc = rrd_update_r(argv[optind], tmplt,
411 argc - optind - 1, (const char **) (argv + optind + 1));
412 out:
413 free(tmplt);
414 return rc;
415 }
417 int rrd_update_r(
418 const char *filename,
419 const char *tmplt,
420 int argc,
421 const char **argv)
422 {
423 return _rrd_update(filename, tmplt, argc, argv, NULL);
424 }
426 int _rrd_update(
427 const char *filename,
428 const char *tmplt,
429 int argc,
430 const char **argv,
431 rrd_info_t * pcdp_summary)
432 {
434 int arg_i = 2;
436 unsigned long rra_begin; /* byte pointer to the rra
437 * area in the rrd file. this
438 * pointer never changes value */
439 unsigned long rra_current; /* byte pointer to the current write
440 * spot in the rrd file. */
441 rrd_value_t *pdp_new; /* prepare the incoming data to be added
442 * to the existing entry */
443 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
444 * to the cdp values */
446 long *tmpl_idx; /* index representing the settings
447 * transported by the tmplt index */
448 unsigned long tmpl_cnt = 2; /* time and data */
449 rrd_t rrd;
450 time_t current_time = 0;
451 unsigned long current_time_usec = 0; /* microseconds part of current time */
452 char **updvals;
453 int schedule_smooth = 0;
455 /* number of elapsed PDP steps since last update */
456 unsigned long *rra_step_cnt = NULL;
458 int version; /* rrd version */
459 rrd_file_t *rrd_file;
460 char *arg_copy; /* for processing the argv */
461 unsigned long *skip_update; /* RRAs to advance but not write */
463 /* need at least 1 arguments: data. */
464 if (argc < 1) {
465 rrd_set_error("Not enough arguments");
466 goto err_out;
467 }
469 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
470 goto err_free;
471 }
472 /* We are now at the beginning of the rra's */
473 rra_current = rra_begin = rrd_file->header_len;
475 version = atoi(rrd.stat_head->version);
477 initialize_time(¤t_time, ¤t_time_usec, version);
479 /* get exclusive lock to whole file.
480 * lock gets removed when we close the file.
481 */
482 if (rrd_lock(rrd_file) != 0) {
483 rrd_set_error("could not lock RRD");
484 goto err_close;
485 }
487 if (allocate_data_structures(&rrd, &updvals,
488 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489 &rra_step_cnt, &skip_update,
490 &pdp_new) == -1) {
491 goto err_close;
492 }
494 /* loop through the arguments. */
495 for (arg_i = 0; arg_i < argc; arg_i++) {
496 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497 rrd_set_error("failed duplication argv entry");
498 break;
499 }
500 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
502 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503 &pcdp_summary, version, skip_update,
504 &schedule_smooth) == -1) {
505 if (rrd_test_error()) { /* Should have error string always here */
506 char *save_error;
508 /* Prepend file name to error message */
509 if ((save_error = strdup(rrd_get_error())) != NULL) {
510 rrd_set_error("%s: %s", filename, save_error);
511 free(save_error);
512 }
513 }
514 free(arg_copy);
515 break;
516 }
517 free(arg_copy);
518 }
520 free(rra_step_cnt);
522 /* if we got here and if there is an error and if the file has not been
523 * written to, then close things up and return. */
524 if (rrd_test_error()) {
525 goto err_free_structures;
526 }
527 #ifndef HAVE_MMAP
528 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
529 goto err_free_structures;
530 }
531 #endif
533 /* calling the smoothing code here guarantees at most one smoothing
534 * operation per rrd_update call. Unfortunately, it is possible with bulk
535 * updates, or a long-delayed update for smoothing to occur off-schedule.
536 * This really isn't critical except during the burn-in cycles. */
537 if (schedule_smooth) {
538 smooth_all_rras(&rrd, rrd_file, rra_begin);
539 }
541 /* rrd_dontneed(rrd_file,&rrd); */
542 rrd_free(&rrd);
543 rrd_close(rrd_file);
545 free(pdp_new);
546 free(tmpl_idx);
547 free(pdp_temp);
548 free(skip_update);
549 free(updvals);
550 return 0;
552 err_free_structures:
553 free(pdp_new);
554 free(tmpl_idx);
555 free(pdp_temp);
556 free(skip_update);
557 free(updvals);
558 err_close:
559 rrd_close(rrd_file);
560 err_free:
561 rrd_free(&rrd);
562 err_out:
563 return -1;
564 }
566 /*
567 * get exclusive lock to whole file.
568 * lock gets removed when we close the file
569 *
570 * returns 0 on success
571 */
572 int rrd_lock(
573 rrd_file_t *file)
574 {
575 int rcstat;
577 {
578 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
579 struct _stat st;
581 if (_fstat(file->fd, &st) == 0) {
582 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
583 } else {
584 rcstat = -1;
585 }
586 #else
587 struct flock lock;
589 lock.l_type = F_WRLCK; /* exclusive write lock */
590 lock.l_len = 0; /* whole file */
591 lock.l_start = 0; /* start of file */
592 lock.l_whence = SEEK_SET; /* end of file */
594 rcstat = fcntl(file->fd, F_SETLK, &lock);
595 #endif
596 }
598 return (rcstat);
599 }
601 /*
602 * Allocate some important arrays used, and initialize the template.
603 *
604 * When it returns, either all of the structures are allocated
605 * or none of them are.
606 *
607 * Returns 0 on success, -1 on error.
608 */
609 static int allocate_data_structures(
610 rrd_t *rrd,
611 char ***updvals,
612 rrd_value_t **pdp_temp,
613 const char *tmplt,
614 long **tmpl_idx,
615 unsigned long *tmpl_cnt,
616 unsigned long **rra_step_cnt,
617 unsigned long **skip_update,
618 rrd_value_t **pdp_new)
619 {
620 unsigned i, ii;
621 if ((*updvals = (char **) malloc(sizeof(char *)
622 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
623 rrd_set_error("allocating updvals pointer array.");
624 return -1;
625 }
626 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
627 * rrd->stat_head->ds_cnt)) ==
628 NULL) {
629 rrd_set_error("allocating pdp_temp.");
630 goto err_free_updvals;
631 }
632 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
633 *
634 rrd->stat_head->rra_cnt)) ==
635 NULL) {
636 rrd_set_error("allocating skip_update.");
637 goto err_free_pdp_temp;
638 }
639 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
640 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641 rrd_set_error("allocating tmpl_idx.");
642 goto err_free_skip_update;
643 }
644 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
645 *
646 (rrd->stat_head->
647 rra_cnt))) == NULL) {
648 rrd_set_error("allocating rra_step_cnt.");
649 goto err_free_tmpl_idx;
650 }
652 /* initialize tmplt redirector */
653 /* default config example (assume DS 1 is a CDEF DS)
654 tmpl_idx[0] -> 0; (time)
655 tmpl_idx[1] -> 1; (DS 0)
656 tmpl_idx[2] -> 3; (DS 2)
657 tmpl_idx[3] -> 4; (DS 3) */
658 (*tmpl_idx)[0] = 0; /* time */
659 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
660 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
661 (*tmpl_idx)[ii++] = i;
662 }
663 *tmpl_cnt = ii;
665 if (tmplt != NULL) {
666 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
667 goto err_free_rra_step_cnt;
668 }
669 }
671 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
672 * rrd->stat_head->ds_cnt)) == NULL) {
673 rrd_set_error("allocating pdp_new.");
674 goto err_free_rra_step_cnt;
675 }
677 return 0;
679 err_free_rra_step_cnt:
680 free(*rra_step_cnt);
681 err_free_tmpl_idx:
682 free(*tmpl_idx);
683 err_free_skip_update:
684 free(*skip_update);
685 err_free_pdp_temp:
686 free(*pdp_temp);
687 err_free_updvals:
688 free(*updvals);
689 return -1;
690 }
692 /*
693 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
694 *
695 * Returns 0 on success.
696 */
697 static int parse_template(
698 rrd_t *rrd,
699 const char *tmplt,
700 unsigned long *tmpl_cnt,
701 long *tmpl_idx)
702 {
703 char *dsname, *tmplt_copy;
704 unsigned int tmpl_len, i;
705 int ret = 0;
707 *tmpl_cnt = 1; /* the first entry is the time */
709 /* we should work on a writeable copy here */
710 if ((tmplt_copy = strdup(tmplt)) == NULL) {
711 rrd_set_error("error copying tmplt '%s'", tmplt);
712 ret = -1;
713 goto out;
714 }
716 dsname = tmplt_copy;
717 tmpl_len = strlen(tmplt_copy);
718 for (i = 0; i <= tmpl_len; i++) {
719 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
720 tmplt_copy[i] = '\0';
721 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
722 rrd_set_error("tmplt contains more DS definitions than RRD");
723 ret = -1;
724 goto out_free_tmpl_copy;
725 }
726 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
727 rrd_set_error("unknown DS name '%s'", dsname);
728 ret = -1;
729 goto out_free_tmpl_copy;
730 }
731 /* go to the next entry on the tmplt_copy */
732 if (i < tmpl_len)
733 dsname = &tmplt_copy[i + 1];
734 }
735 }
736 out_free_tmpl_copy:
737 free(tmplt_copy);
738 out:
739 return ret;
740 }
742 /*
743 * Parse an update string, updates the primary data points (PDPs)
744 * and consolidated data points (CDPs), and writes changes to the RRAs.
745 *
746 * Returns 0 on success, -1 on error.
747 */
748 static int process_arg(
749 char *step_start,
750 rrd_t *rrd,
751 rrd_file_t *rrd_file,
752 unsigned long rra_begin,
753 unsigned long *rra_current,
754 time_t *current_time,
755 unsigned long *current_time_usec,
756 rrd_value_t *pdp_temp,
757 rrd_value_t *pdp_new,
758 unsigned long *rra_step_cnt,
759 char **updvals,
760 long *tmpl_idx,
761 unsigned long tmpl_cnt,
762 rrd_info_t ** pcdp_summary,
763 int version,
764 unsigned long *skip_update,
765 int *schedule_smooth)
766 {
767 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
769 /* a vector of future Holt-Winters seasonal coefs */
770 unsigned long elapsed_pdp_st;
772 double interval, pre_int, post_int; /* interval between this and
773 * the last run */
774 unsigned long proc_pdp_cnt;
775 unsigned long rra_start;
777 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
778 current_time, current_time_usec, version) == -1) {
779 return -1;
780 }
781 /* seek to the beginning of the rra's */
782 if (*rra_current != rra_begin) {
783 #ifndef HAVE_MMAP
784 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
785 rrd_set_error("seek error in rrd");
786 return -1;
787 }
788 #endif
789 *rra_current = rra_begin;
790 }
791 rra_start = rra_begin;
793 interval = (double) (*current_time - rrd->live_head->last_up)
794 + (double) ((long) *current_time_usec -
795 (long) rrd->live_head->last_up_usec) / 1e6f;
797 /* process the data sources and update the pdp_prep
798 * area accordingly */
799 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
800 return -1;
801 }
803 elapsed_pdp_st = calculate_elapsed_steps(rrd,
804 *current_time,
805 *current_time_usec, interval,
806 &pre_int, &post_int,
807 &proc_pdp_cnt);
809 /* has a pdp_st moment occurred since the last run ? */
810 if (elapsed_pdp_st == 0) {
811 /* no we have not passed a pdp_st moment. therefore update is simple */
812 simple_update(rrd, interval, pdp_new);
813 } else {
814 /* an pdp_st has occurred. */
815 if (process_all_pdp_st(rrd, interval,
816 pre_int, post_int,
817 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
818 return -1;
819 }
820 if (update_all_cdp_prep(rrd, rra_step_cnt,
821 rra_begin, rrd_file,
822 elapsed_pdp_st,
823 proc_pdp_cnt,
824 &last_seasonal_coef,
825 &seasonal_coef,
826 pdp_temp, rra_current,
827 skip_update, schedule_smooth) == -1) {
828 goto err_free_coefficients;
829 }
830 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
831 elapsed_pdp_st, pdp_temp,
832 &seasonal_coef) == -1) {
833 goto err_free_coefficients;
834 }
835 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
836 rra_current, *current_time, skip_update,
837 pcdp_summary) == -1) {
838 goto err_free_coefficients;
839 }
840 } /* endif a pdp_st has occurred */
841 rrd->live_head->last_up = *current_time;
842 rrd->live_head->last_up_usec = *current_time_usec;
844 if (version < 3) {
845 *rrd->legacy_last_up = rrd->live_head->last_up;
846 }
847 free(seasonal_coef);
848 free(last_seasonal_coef);
849 return 0;
851 err_free_coefficients:
852 free(seasonal_coef);
853 free(last_seasonal_coef);
854 return -1;
855 }
857 /*
858 * Parse a DS string (time + colon-separated values), storing the
859 * results in current_time, current_time_usec, and updvals.
860 *
861 * Returns 0 on success, -1 on error.
862 */
863 static int parse_ds(
864 rrd_t *rrd,
865 char **updvals,
866 long *tmpl_idx,
867 char *input,
868 unsigned long tmpl_cnt,
869 time_t *current_time,
870 unsigned long *current_time_usec,
871 int version)
872 {
873 char *p;
874 unsigned long i;
875 char timesyntax;
877 updvals[0] = input;
878 /* initialize all ds input to unknown except the first one
879 which has always got to be set */
880 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
881 updvals[i] = "U";
883 /* separate all ds elements; first must be examined separately
884 due to alternate time syntax */
885 if ((p = strchr(input, '@')) != NULL) {
886 timesyntax = '@';
887 } else if ((p = strchr(input, ':')) != NULL) {
888 timesyntax = ':';
889 } else {
890 rrd_set_error("expected timestamp not found in data source from %s",
891 input);
892 return -1;
893 }
894 *p = '\0';
895 i = 1;
896 updvals[tmpl_idx[i++]] = p + 1;
897 while (*(++p)) {
898 if (*p == ':') {
899 *p = '\0';
900 if (i < tmpl_cnt) {
901 updvals[tmpl_idx[i++]] = p + 1;
902 }
903 }
904 }
906 if (i != tmpl_cnt) {
907 rrd_set_error("expected %lu data source readings (got %lu) from %s",
908 tmpl_cnt - 1, i, input);
909 return -1;
910 }
912 if (get_time_from_reading(rrd, timesyntax, updvals,
913 current_time, current_time_usec,
914 version) == -1) {
915 return -1;
916 }
917 return 0;
918 }
920 /*
921 * Parse the time in a DS string, store it in current_time and
922 * current_time_usec and verify that it's later than the last
923 * update for this DS.
924 *
925 * Returns 0 on success, -1 on error.
926 */
927 static int get_time_from_reading(
928 rrd_t *rrd,
929 char timesyntax,
930 char **updvals,
931 time_t *current_time,
932 unsigned long *current_time_usec,
933 int version)
934 {
935 double tmp;
936 char *parsetime_error = NULL;
937 char *old_locale;
938 rrd_time_value_t ds_tv;
939 struct timeval tmp_time; /* used for time conversion */
941 /* get the time from the reading ... handle N */
942 if (timesyntax == '@') { /* at-style */
943 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
944 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
945 return -1;
946 }
947 if (ds_tv.type == RELATIVE_TO_END_TIME ||
948 ds_tv.type == RELATIVE_TO_START_TIME) {
949 rrd_set_error("specifying time relative to the 'start' "
950 "or 'end' makes no sense here: %s", updvals[0]);
951 return -1;
952 }
953 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
954 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
955 } else if (strcmp(updvals[0], "N") == 0) {
956 gettimeofday(&tmp_time, 0);
957 normalize_time(&tmp_time);
958 *current_time = tmp_time.tv_sec;
959 *current_time_usec = tmp_time.tv_usec;
960 } else {
961 old_locale = setlocale(LC_NUMERIC, "C");
962 tmp = strtod(updvals[0], 0);
963 setlocale(LC_NUMERIC, old_locale);
964 *current_time = floor(tmp);
965 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
966 }
967 /* dont do any correction for old version RRDs */
968 if (version < 3)
969 *current_time_usec = 0;
971 if (*current_time < rrd->live_head->last_up ||
972 (*current_time == rrd->live_head->last_up &&
973 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
974 rrd_set_error("illegal attempt to update using time %ld when "
975 "last update time is %ld (minimum one second step)",
976 *current_time, rrd->live_head->last_up);
977 return -1;
978 }
979 return 0;
980 }
982 /*
983 * Update pdp_new by interpreting the updvals according to the DS type
984 * (COUNTER, GAUGE, etc.).
985 *
986 * Returns 0 on success, -1 on error.
987 */
988 static int update_pdp_prep(
989 rrd_t *rrd,
990 char **updvals,
991 rrd_value_t *pdp_new,
992 double interval)
993 {
994 unsigned long ds_idx;
995 int ii;
996 char *endptr; /* used in the conversion */
997 double rate;
998 char *old_locale;
999 enum dst_en dst_idx;
1001 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1002 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1004 /* make sure we do not build diffs with old last_ds values */
1005 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1006 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1007 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1008 }
1010 /* NOTE: DST_CDEF should never enter this if block, because
1011 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1012 * accidently specified a value for the DST_CDEF. To handle this case,
1013 * an extra check is required. */
1015 if ((updvals[ds_idx + 1][0] != 'U') &&
1016 (dst_idx != DST_CDEF) &&
1017 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1018 rate = DNAN;
1020 /* pdp_new contains rate * time ... eg the bytes transferred during
1021 * the interval. Doing it this way saves a lot of math operations
1022 */
1023 switch (dst_idx) {
1024 case DST_COUNTER:
1025 case DST_DERIVE:
1026 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1027 if ((updvals[ds_idx + 1][ii] < '0'
1028 || updvals[ds_idx + 1][ii] > '9')
1029 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1030 rrd_set_error("not a simple integer: '%s'",
1031 updvals[ds_idx + 1]);
1032 return -1;
1033 }
1034 }
1035 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1036 pdp_new[ds_idx] =
1037 rrd_diff(updvals[ds_idx + 1],
1038 rrd->pdp_prep[ds_idx].last_ds);
1039 if (dst_idx == DST_COUNTER) {
1040 /* simple overflow catcher. This will fail
1041 * terribly for non 32 or 64 bit counters
1042 * ... are there any others in SNMP land?
1043 */
1044 if (pdp_new[ds_idx] < (double) 0.0)
1045 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1046 if (pdp_new[ds_idx] < (double) 0.0)
1047 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1048 }
1049 rate = pdp_new[ds_idx] / interval;
1050 } else {
1051 pdp_new[ds_idx] = DNAN;
1052 }
1053 break;
1054 case DST_ABSOLUTE:
1055 old_locale = setlocale(LC_NUMERIC, "C");
1056 errno = 0;
1057 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1058 setlocale(LC_NUMERIC, old_locale);
1059 if (errno > 0) {
1060 rrd_set_error("converting '%s' to float: %s",
1061 updvals[ds_idx + 1], rrd_strerror(errno));
1062 return -1;
1063 };
1064 if (endptr[0] != '\0') {
1065 rrd_set_error
1066 ("conversion of '%s' to float not complete: tail '%s'",
1067 updvals[ds_idx + 1], endptr);
1068 return -1;
1069 }
1070 rate = pdp_new[ds_idx] / interval;
1071 break;
1072 case DST_GAUGE:
1073 errno = 0;
1074 old_locale = setlocale(LC_NUMERIC, "C");
1075 pdp_new[ds_idx] =
1076 strtod(updvals[ds_idx + 1], &endptr) * interval;
1077 setlocale(LC_NUMERIC, old_locale);
1078 if (errno) {
1079 rrd_set_error("converting '%s' to float: %s",
1080 updvals[ds_idx + 1], rrd_strerror(errno));
1081 return -1;
1082 };
1083 if (endptr[0] != '\0') {
1084 rrd_set_error
1085 ("conversion of '%s' to float not complete: tail '%s'",
1086 updvals[ds_idx + 1], endptr);
1087 return -1;
1088 }
1089 rate = pdp_new[ds_idx] / interval;
1090 break;
1091 default:
1092 rrd_set_error("rrd contains unknown DS type : '%s'",
1093 rrd->ds_def[ds_idx].dst);
1094 return -1;
1095 }
1096 /* break out of this for loop if the error string is set */
1097 if (rrd_test_error()) {
1098 return -1;
1099 }
1100 /* make sure pdp_temp is neither too large or too small
1101 * if any of these occur it becomes unknown ...
1102 * sorry folks ... */
1103 if (!isnan(rate) &&
1104 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1105 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1106 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1107 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1108 pdp_new[ds_idx] = DNAN;
1109 }
1110 } else {
1111 /* no news is news all the same */
1112 pdp_new[ds_idx] = DNAN;
1113 }
1116 /* make a copy of the command line argument for the next run */
1117 #ifdef DEBUG
1118 fprintf(stderr, "prep ds[%lu]\t"
1119 "last_arg '%s'\t"
1120 "this_arg '%s'\t"
1121 "pdp_new %10.2f\n",
1122 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1123 pdp_new[ds_idx]);
1124 #endif
1125 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1126 LAST_DS_LEN - 1);
1127 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1128 }
1129 return 0;
1130 }
1132 /*
1133 * How many PDP steps have elapsed since the last update? Returns the answer,
1134 * and stores the time between the last update and the last PDP in pre_time,
1135 * and the time between the last PDP and the current time in post_int.
1136 */
1137 static int calculate_elapsed_steps(
1138 rrd_t *rrd,
1139 unsigned long current_time,
1140 unsigned long current_time_usec,
1141 double interval,
1142 double *pre_int,
1143 double *post_int,
1144 unsigned long *proc_pdp_cnt)
1145 {
1146 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1147 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1148 * time */
1149 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1150 * when it was last updated */
1151 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1153 /* when was the current pdp started */
1154 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1155 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1157 /* when did the last pdp_st occur */
1158 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1159 occu_pdp_st = current_time - occu_pdp_age;
1161 if (occu_pdp_st > proc_pdp_st) {
1162 /* OK we passed the pdp_st moment */
1163 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1164 * occurred before the latest
1165 * pdp_st moment*/
1166 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1167 *post_int = occu_pdp_age; /* how much after it */
1168 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1169 } else {
1170 *pre_int = interval;
1171 *post_int = 0;
1172 }
1174 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1176 #ifdef DEBUG
1177 printf("proc_pdp_age %lu\t"
1178 "proc_pdp_st %lu\t"
1179 "occu_pfp_age %lu\t"
1180 "occu_pdp_st %lu\t"
1181 "int %lf\t"
1182 "pre_int %lf\t"
1183 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1184 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1185 #endif
1187 /* compute the number of elapsed pdp_st moments */
1188 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1189 }
1191 /*
1192 * Increment the PDP values by the values in pdp_new, or else initialize them.
1193 */
1194 static void simple_update(
1195 rrd_t *rrd,
1196 double interval,
1197 rrd_value_t *pdp_new)
1198 {
1199 int i;
1201 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1202 if (isnan(pdp_new[i])) {
1203 /* this is not really accurate if we use subsecond data arrival time
1204 should have thought of it when going subsecond resolution ...
1205 sorry next format change we will have it! */
1206 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1207 floor(interval);
1208 } else {
1209 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1210 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1211 } else {
1212 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1213 }
1214 }
1215 #ifdef DEBUG
1216 fprintf(stderr,
1217 "NO PDP ds[%i]\t"
1218 "value %10.2f\t"
1219 "unkn_sec %5lu\n",
1220 i,
1221 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1222 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1223 #endif
1224 }
1225 }
1227 /*
1228 * Call process_pdp_st for each DS.
1229 *
1230 * Returns 0 on success, -1 on error.
1231 */
1232 static int process_all_pdp_st(
1233 rrd_t *rrd,
1234 double interval,
1235 double pre_int,
1236 double post_int,
1237 unsigned long elapsed_pdp_st,
1238 rrd_value_t *pdp_new,
1239 rrd_value_t *pdp_temp)
1240 {
1241 unsigned long ds_idx;
1243 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1244 rate*seconds which occurred up to the last run.
1245 pdp_new[] contains rate*seconds from the latest run.
1246 pdp_temp[] will contain the rate for cdp */
1248 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1249 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1250 elapsed_pdp_st * rrd->stat_head->pdp_step,
1251 pdp_new, pdp_temp) == -1) {
1252 return -1;
1253 }
1254 #ifdef DEBUG
1255 fprintf(stderr, "PDP UPD ds[%lu]\t"
1256 "elapsed_pdp_st %lu\t"
1257 "pdp_temp %10.2f\t"
1258 "new_prep %10.2f\t"
1259 "new_unkn_sec %5lu\n",
1260 ds_idx,
1261 elapsed_pdp_st,
1262 pdp_temp[ds_idx],
1263 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1264 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1265 #endif
1266 }
1267 return 0;
1268 }
1270 /*
1271 * Process an update that occurs after one of the PDP moments.
1272 * Increments the PDP value, sets NAN if time greater than the
1273 * heartbeats have elapsed, processes CDEFs.
1274 *
1275 * Returns 0 on success, -1 on error.
1276 */
1277 static int process_pdp_st(
1278 rrd_t *rrd,
1279 unsigned long ds_idx,
1280 double interval,
1281 double pre_int,
1282 double post_int,
1283 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1284 rrd_value_t *pdp_new,
1285 rrd_value_t *pdp_temp)
1286 {
1287 int i;
1289 /* update pdp_prep to the current pdp_st. */
1290 double pre_unknown = 0.0;
1291 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1292 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1294 rpnstack_t rpnstack; /* used for COMPUTE DS */
1296 rpnstack_init(&rpnstack);
1299 if (isnan(pdp_new[ds_idx])) {
1300 /* a final bit of unknown to be added before calculation
1301 we use a temporary variable for this so that we
1302 don't have to turn integer lines before using the value */
1303 pre_unknown = pre_int;
1304 } else {
1305 if (isnan(scratch[PDP_val].u_val)) {
1306 scratch[PDP_val].u_val = 0;
1307 }
1308 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1309 }
1311 /* if too much of the pdp_prep is unknown we dump it */
1312 /* if the interval is larger thatn mrhb we get NAN */
1313 if ((interval > mrhb) ||
1314 (rrd->stat_head->pdp_step / 2.0 <
1315 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1316 pdp_temp[ds_idx] = DNAN;
1317 } else {
1318 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1319 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1320 pre_unknown);
1321 }
1323 /* process CDEF data sources; remember each CDEF DS can
1324 * only reference other DS with a lower index number */
1325 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1326 rpnp_t *rpnp;
1328 rpnp =
1329 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1330 /* substitute data values for OP_VARIABLE nodes */
1331 for (i = 0; rpnp[i].op != OP_END; i++) {
1332 if (rpnp[i].op == OP_VARIABLE) {
1333 rpnp[i].op = OP_NUMBER;
1334 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1335 }
1336 }
1337 /* run the rpn calculator */
1338 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1339 free(rpnp);
1340 rpnstack_free(&rpnstack);
1341 return -1;
1342 }
1343 }
1345 /* make pdp_prep ready for the next run */
1346 if (isnan(pdp_new[ds_idx])) {
1347 /* this is not realy accurate if we use subsecond data arival time
1348 should have thought of it when going subsecond resolution ...
1349 sorry next format change we will have it! */
1350 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1351 scratch[PDP_val].u_val = DNAN;
1352 } else {
1353 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1354 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1355 }
1356 rpnstack_free(&rpnstack);
1357 return 0;
1358 }
1360 /*
1361 * Iterate over all the RRAs for a given DS and:
1362 * 1. Decide whether to schedule a smooth later
1363 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1364 * 3. Update the CDP
1365 *
1366 * Returns 0 on success, -1 on error
1367 */
1368 static int update_all_cdp_prep(
1369 rrd_t *rrd,
1370 unsigned long *rra_step_cnt,
1371 unsigned long rra_begin,
1372 rrd_file_t *rrd_file,
1373 unsigned long elapsed_pdp_st,
1374 unsigned long proc_pdp_cnt,
1375 rrd_value_t **last_seasonal_coef,
1376 rrd_value_t **seasonal_coef,
1377 rrd_value_t *pdp_temp,
1378 unsigned long *rra_current,
1379 unsigned long *skip_update,
1380 int *schedule_smooth)
1381 {
1382 unsigned long rra_idx;
1384 /* index into the CDP scratch array */
1385 enum cf_en current_cf;
1386 unsigned long rra_start;
1388 /* number of rows to be updated in an RRA for a data value. */
1389 unsigned long start_pdp_offset;
1391 rra_start = rra_begin;
1392 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1393 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1394 start_pdp_offset =
1395 rrd->rra_def[rra_idx].pdp_cnt -
1396 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1397 skip_update[rra_idx] = 0;
1398 if (start_pdp_offset <= elapsed_pdp_st) {
1399 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1400 rrd->rra_def[rra_idx].pdp_cnt + 1;
1401 } else {
1402 rra_step_cnt[rra_idx] = 0;
1403 }
1405 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1406 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1407 * so that they will be correct for the next observed value; note that for
1408 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1409 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1410 if (rra_step_cnt[rra_idx] > 1) {
1411 skip_update[rra_idx] = 1;
1412 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1413 elapsed_pdp_st, last_seasonal_coef);
1414 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1415 elapsed_pdp_st + 1, seasonal_coef);
1416 }
1417 /* periodically run a smoother for seasonal effects */
1418 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1419 #ifdef DEBUG
1420 fprintf(stderr,
1421 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1422 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1423 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1424 u_cnt);
1425 #endif
1426 *schedule_smooth = 1;
1427 }
1428 *rra_current = rrd_tell(rrd_file);
1429 }
1430 if (rrd_test_error())
1431 return -1;
1433 if (update_cdp_prep
1434 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1435 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1436 current_cf) == -1) {
1437 return -1;
1438 }
1439 rra_start +=
1440 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1441 sizeof(rrd_value_t);
1442 }
1443 return 0;
1444 }
1446 /*
1447 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1448 */
1449 static int do_schedule_smooth(
1450 rrd_t *rrd,
1451 unsigned long rra_idx,
1452 unsigned long elapsed_pdp_st)
1453 {
1454 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1455 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1456 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1457 unsigned long seasonal_smooth_idx =
1458 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1459 unsigned long *init_seasonal =
1460 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1462 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1463 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1464 * really an RRA level, not a data source within RRA level parameter, but
1465 * the rra_def is read only for rrd_update (not flushed to disk). */
1466 if (*init_seasonal > BURNIN_CYCLES) {
1467 /* someone has no doubt invented a trick to deal with this wrap around,
1468 * but at least this code is clear. */
1469 if (seasonal_smooth_idx > cur_row) {
1470 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1471 * between PDP and CDP */
1472 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1473 }
1474 /* can't rely on negative numbers because we are working with
1475 * unsigned values */
1476 return (cur_row + elapsed_pdp_st >= row_cnt
1477 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1478 }
1479 /* mark off one of the burn-in cycles */
1480 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1481 }
1483 /*
1484 * For a given RRA, iterate over the data sources and call the appropriate
1485 * consolidation function.
1486 *
1487 * Returns 0 on success, -1 on error.
1488 */
1489 static int update_cdp_prep(
1490 rrd_t *rrd,
1491 unsigned long elapsed_pdp_st,
1492 unsigned long start_pdp_offset,
1493 unsigned long *rra_step_cnt,
1494 int rra_idx,
1495 rrd_value_t *pdp_temp,
1496 rrd_value_t *last_seasonal_coef,
1497 rrd_value_t *seasonal_coef,
1498 int current_cf)
1499 {
1500 unsigned long ds_idx, cdp_idx;
1502 /* update CDP_PREP areas */
1503 /* loop over data soures within each RRA */
1504 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1506 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1508 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1509 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1510 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1511 elapsed_pdp_st, start_pdp_offset,
1512 rrd->rra_def[rra_idx].pdp_cnt,
1513 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1514 rra_idx, ds_idx);
1515 } else {
1516 /* Nothing to consolidate if there's one PDP per CDP. However, if
1517 * we've missed some PDPs, let's update null counters etc. */
1518 if (elapsed_pdp_st > 2) {
1519 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1520 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1521 current_cf);
1522 }
1523 }
1525 if (rrd_test_error())
1526 return -1;
1527 } /* endif data sources loop */
1528 return 0;
1529 }
1531 /*
1532 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1533 * primary value, secondary value, and # of unknowns.
1534 */
1535 static void update_cdp(
1536 unival *scratch,
1537 int current_cf,
1538 rrd_value_t pdp_temp_val,
1539 unsigned long rra_step_cnt,
1540 unsigned long elapsed_pdp_st,
1541 unsigned long start_pdp_offset,
1542 unsigned long pdp_cnt,
1543 rrd_value_t xff,
1544 int i,
1545 int ii)
1546 {
1547 /* shorthand variables */
1548 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1549 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1550 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1551 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1553 if (rra_step_cnt) {
1554 /* If we are in this block, as least 1 CDP value will be written to
1555 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1556 * to be written, then the "fill in" value is the CDP_secondary_val
1557 * entry. */
1558 if (isnan(pdp_temp_val)) {
1559 *cdp_unkn_pdp_cnt += start_pdp_offset;
1560 *cdp_secondary_val = DNAN;
1561 } else {
1562 /* CDP_secondary value is the RRA "fill in" value for intermediary
1563 * CDP data entries. No matter the CF, the value is the same because
1564 * the average, max, min, and last of a list of identical values is
1565 * the same, namely, the value itself. */
1566 *cdp_secondary_val = pdp_temp_val;
1567 }
1569 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1570 *cdp_primary_val = DNAN;
1571 if (current_cf == CF_AVERAGE) {
1572 *cdp_val =
1573 initialize_average_carry_over(pdp_temp_val,
1574 elapsed_pdp_st,
1575 start_pdp_offset, pdp_cnt);
1576 } else {
1577 *cdp_val = pdp_temp_val;
1578 }
1579 } else {
1580 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1581 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1582 } /* endif meets xff value requirement for a valid value */
1583 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1584 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1585 if (isnan(pdp_temp_val))
1586 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1587 else
1588 *cdp_unkn_pdp_cnt = 0;
1589 } else { /* rra_step_cnt[i] == 0 */
1591 #ifdef DEBUG
1592 if (isnan(*cdp_val)) {
1593 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1594 i, ii);
1595 } else {
1596 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1597 i, ii, *cdp_val);
1598 }
1599 #endif
1600 if (isnan(pdp_temp_val)) {
1601 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1602 } else {
1603 *cdp_val =
1604 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1605 current_cf, i, ii);
1606 }
1607 }
1608 }
1610 /*
1611 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1612 * on the type of consolidation function.
1613 */
1614 static void initialize_cdp_val(
1615 unival *scratch,
1616 int current_cf,
1617 rrd_value_t pdp_temp_val,
1618 unsigned long elapsed_pdp_st,
1619 unsigned long start_pdp_offset,
1620 unsigned long pdp_cnt)
1621 {
1622 rrd_value_t cum_val, cur_val;
1624 switch (current_cf) {
1625 case CF_AVERAGE:
1626 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1627 cur_val = IFDNAN(pdp_temp_val, 0.0);
1628 scratch[CDP_primary_val].u_val =
1629 (cum_val + cur_val * start_pdp_offset) /
1630 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1631 scratch[CDP_val].u_val =
1632 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1633 start_pdp_offset, pdp_cnt);
1634 break;
1635 case CF_MAXIMUM:
1636 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1637 cur_val = IFDNAN(pdp_temp_val, -DINF);
1638 #if 0
1639 #ifdef DEBUG
1640 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1641 fprintf(stderr,
1642 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1643 i, ii);
1644 exit(-1);
1645 }
1646 #endif
1647 #endif
1648 if (cur_val > cum_val)
1649 scratch[CDP_primary_val].u_val = cur_val;
1650 else
1651 scratch[CDP_primary_val].u_val = cum_val;
1652 /* initialize carry over value */
1653 scratch[CDP_val].u_val = pdp_temp_val;
1654 break;
1655 case CF_MINIMUM:
1656 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1657 cur_val = IFDNAN(pdp_temp_val, DINF);
1658 #if 0
1659 #ifdef DEBUG
1660 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1661 fprintf(stderr,
1662 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1663 ii);
1664 exit(-1);
1665 }
1666 #endif
1667 #endif
1668 if (cur_val < cum_val)
1669 scratch[CDP_primary_val].u_val = cur_val;
1670 else
1671 scratch[CDP_primary_val].u_val = cum_val;
1672 /* initialize carry over value */
1673 scratch[CDP_val].u_val = pdp_temp_val;
1674 break;
1675 case CF_LAST:
1676 default:
1677 scratch[CDP_primary_val].u_val = pdp_temp_val;
1678 /* initialize carry over value */
1679 scratch[CDP_val].u_val = pdp_temp_val;
1680 break;
1681 }
1682 }
1684 /*
1685 * Update the consolidation function for Holt-Winters functions as
1686 * well as other functions that don't actually consolidate multiple
1687 * PDPs.
1688 */
1689 static void reset_cdp(
1690 rrd_t *rrd,
1691 unsigned long elapsed_pdp_st,
1692 rrd_value_t *pdp_temp,
1693 rrd_value_t *last_seasonal_coef,
1694 rrd_value_t *seasonal_coef,
1695 int rra_idx,
1696 int ds_idx,
1697 int cdp_idx,
1698 enum cf_en current_cf)
1699 {
1700 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1702 switch (current_cf) {
1703 case CF_AVERAGE:
1704 default:
1705 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1706 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1707 break;
1708 case CF_SEASONAL:
1709 case CF_DEVSEASONAL:
1710 /* need to update cached seasonal values, so they are consistent
1711 * with the bulk update */
1712 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1713 * CDP_last_deviation are the same. */
1714 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1715 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1716 break;
1717 case CF_HWPREDICT:
1718 case CF_MHWPREDICT:
1719 /* need to update the null_count and last_null_count.
1720 * even do this for non-DNAN pdp_temp because the
1721 * algorithm is not learning from batch updates. */
1722 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1723 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1724 /* fall through */
1725 case CF_DEVPREDICT:
1726 scratch[CDP_primary_val].u_val = DNAN;
1727 scratch[CDP_secondary_val].u_val = DNAN;
1728 break;
1729 case CF_FAILURES:
1730 /* do not count missed bulk values as failures */
1731 scratch[CDP_primary_val].u_val = 0;
1732 scratch[CDP_secondary_val].u_val = 0;
1733 /* need to reset violations buffer.
1734 * could do this more carefully, but for now, just
1735 * assume a bulk update wipes away all violations. */
1736 erase_violations(rrd, cdp_idx, rra_idx);
1737 break;
1738 }
1739 }
1741 static rrd_value_t initialize_average_carry_over(
1742 rrd_value_t pdp_temp_val,
1743 unsigned long elapsed_pdp_st,
1744 unsigned long start_pdp_offset,
1745 unsigned long pdp_cnt)
1746 {
1747 /* initialize carry over value */
1748 if (isnan(pdp_temp_val)) {
1749 return DNAN;
1750 }
1751 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1752 }
1754 /*
1755 * Update or initialize a CDP value based on the consolidation
1756 * function.
1757 *
1758 * Returns the new value.
1759 */
1760 static rrd_value_t calculate_cdp_val(
1761 rrd_value_t cdp_val,
1762 rrd_value_t pdp_temp_val,
1763 unsigned long elapsed_pdp_st,
1764 int current_cf,
1765 #ifdef DEBUG
1766 int i,
1767 int ii
1768 #else
1769 int UNUSED(i),
1770 int UNUSED(ii)
1771 #endif
1772 )
1773 {
1774 if (isnan(cdp_val)) {
1775 if (current_cf == CF_AVERAGE) {
1776 pdp_temp_val *= elapsed_pdp_st;
1777 }
1778 #ifdef DEBUG
1779 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1780 i, ii, pdp_temp_val);
1781 #endif
1782 return pdp_temp_val;
1783 }
1784 if (current_cf == CF_AVERAGE)
1785 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1786 if (current_cf == CF_MINIMUM)
1787 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1788 if (current_cf == CF_MAXIMUM)
1789 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1791 return pdp_temp_val;
1792 }
1794 /*
1795 * For each RRA, update the seasonal values and then call update_aberrant_CF
1796 * for each data source.
1797 *
1798 * Return 0 on success, -1 on error.
1799 */
1800 static int update_aberrant_cdps(
1801 rrd_t *rrd,
1802 rrd_file_t *rrd_file,
1803 unsigned long rra_begin,
1804 unsigned long *rra_current,
1805 unsigned long elapsed_pdp_st,
1806 rrd_value_t *pdp_temp,
1807 rrd_value_t **seasonal_coef)
1808 {
1809 unsigned long rra_idx, ds_idx, j;
1811 /* number of PDP steps since the last update that
1812 * are assigned to the first CDP to be generated
1813 * since the last update. */
1814 unsigned short scratch_idx;
1815 unsigned long rra_start;
1816 enum cf_en current_cf;
1818 /* this loop is only entered if elapsed_pdp_st < 3 */
1819 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1820 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1821 rra_start = rra_begin;
1822 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1823 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1824 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1825 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1826 if (scratch_idx == CDP_primary_val) {
1827 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1828 elapsed_pdp_st + 1, seasonal_coef);
1829 } else {
1830 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1831 elapsed_pdp_st + 2, seasonal_coef);
1832 }
1833 *rra_current = rrd_tell(rrd_file);
1834 }
1835 if (rrd_test_error())
1836 return -1;
1837 /* loop over data soures within each RRA */
1838 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1839 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1840 rra_idx * (rrd->stat_head->ds_cnt) +
1841 ds_idx, rra_idx, ds_idx, scratch_idx,
1842 *seasonal_coef);
1843 }
1844 }
1845 rra_start += rrd->rra_def[rra_idx].row_cnt
1846 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1847 }
1848 }
1849 return 0;
1850 }
1852 /*
1853 * Move sequentially through the file, writing one RRA at a time. Note this
1854 * architecture divorces the computation of CDP with flushing updated RRA
1855 * entries to disk.
1856 *
1857 * Return 0 on success, -1 on error.
1858 */
1859 static int write_to_rras(
1860 rrd_t *rrd,
1861 rrd_file_t *rrd_file,
1862 unsigned long *rra_step_cnt,
1863 unsigned long rra_begin,
1864 unsigned long *rra_current,
1865 time_t current_time,
1866 unsigned long *skip_update,
1867 rrd_info_t ** pcdp_summary)
1868 {
1869 unsigned long rra_idx;
1870 unsigned long rra_start;
1871 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1872 time_t rra_time = 0; /* time of update for a RRA */
1874 /* Ready to write to disk */
1875 rra_start = rra_begin;
1876 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1877 /* skip unless there's something to write */
1878 if (rra_step_cnt[rra_idx]) {
1879 /* write the first row */
1880 #ifdef DEBUG
1881 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1882 #endif
1883 rrd->rra_ptr[rra_idx].cur_row++;
1884 if (rrd->rra_ptr[rra_idx].cur_row >=
1885 rrd->rra_def[rra_idx].row_cnt)
1886 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1887 /* position on the first row */
1888 rra_pos_tmp = rra_start +
1889 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1890 sizeof(rrd_value_t);
1891 if (rra_pos_tmp != *rra_current) {
1892 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1893 rrd_set_error("seek error in rrd");
1894 return -1;
1895 }
1896 *rra_current = rra_pos_tmp;
1897 }
1898 #ifdef DEBUG
1899 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1900 #endif
1901 if (!skip_update[rra_idx]) {
1902 if (*pcdp_summary != NULL) {
1903 rra_time = (current_time - current_time
1904 % (rrd->rra_def[rra_idx].pdp_cnt *
1905 rrd->stat_head->pdp_step))
1906 -
1907 ((rra_step_cnt[rra_idx] -
1908 1) * rrd->rra_def[rra_idx].pdp_cnt *
1909 rrd->stat_head->pdp_step);
1910 }
1911 if (write_RRA_row
1912 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1913 pcdp_summary, rra_time) == -1)
1914 return -1;
1915 }
1917 /* write other rows of the bulk update, if any */
1918 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1919 if (++rrd->rra_ptr[rra_idx].cur_row ==
1920 rrd->rra_def[rra_idx].row_cnt) {
1921 #ifdef DEBUG
1922 fprintf(stderr,
1923 "Wraparound for RRA %s, %lu updates left\n",
1924 rrd->rra_def[rra_idx].cf_nam,
1925 rra_step_cnt[rra_idx] - 1);
1926 #endif
1927 /* wrap */
1928 rrd->rra_ptr[rra_idx].cur_row = 0;
1929 /* seek back to beginning of current rra */
1930 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1931 rrd_set_error("seek error in rrd");
1932 return -1;
1933 }
1934 #ifdef DEBUG
1935 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1936 rrd_file->pos);
1937 #endif
1938 *rra_current = rra_start;
1939 }
1940 if (!skip_update[rra_idx]) {
1941 if (*pcdp_summary != NULL) {
1942 rra_time = (current_time - current_time
1943 % (rrd->rra_def[rra_idx].pdp_cnt *
1944 rrd->stat_head->pdp_step))
1945 -
1946 ((rra_step_cnt[rra_idx] -
1947 2) * rrd->rra_def[rra_idx].pdp_cnt *
1948 rrd->stat_head->pdp_step);
1949 }
1950 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1951 CDP_secondary_val, pcdp_summary,
1952 rra_time) == -1)
1953 return -1;
1954 }
1955 }
1956 }
1957 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1958 sizeof(rrd_value_t);
1959 } /* RRA LOOP */
1961 return 0;
1962 }
1964 /*
1965 * Write out one row of values (one value per DS) to the archive.
1966 *
1967 * Returns 0 on success, -1 on error.
1968 */
1969 static int write_RRA_row(
1970 rrd_file_t *rrd_file,
1971 rrd_t *rrd,
1972 unsigned long rra_idx,
1973 unsigned long *rra_current,
1974 unsigned short CDP_scratch_idx,
1975 rrd_info_t ** pcdp_summary,
1976 time_t rra_time)
1977 {
1978 unsigned long ds_idx, cdp_idx;
1979 rrd_infoval_t iv;
1981 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1982 /* compute the cdp index */
1983 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1984 #ifdef DEBUG
1985 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1986 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1987 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1988 #endif
1989 if (*pcdp_summary != NULL) {
1990 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1991 /* append info to the return hash */
1992 *pcdp_summary = rrd_info_push(*pcdp_summary,
1993 sprintf_alloc
1994 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1995 rrd->rra_def[rra_idx].cf_nam,
1996 rrd->rra_def[rra_idx].pdp_cnt,
1997 rrd->ds_def[ds_idx].ds_nam),
1998 RD_I_VAL, iv);
1999 }
2000 if (rrd_write(rrd_file,
2001 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2002 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2003 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2004 return -1;
2005 }
2006 *rra_current += sizeof(rrd_value_t);
2007 }
2008 return 0;
2009 }
2011 /*
2012 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2013 *
2014 * Returns 0 on success, -1 otherwise
2015 */
2016 static int smooth_all_rras(
2017 rrd_t *rrd,
2018 rrd_file_t *rrd_file,
2019 unsigned long rra_begin)
2020 {
2021 unsigned long rra_start = rra_begin;
2022 unsigned long rra_idx;
2024 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2025 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2026 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2027 #ifdef DEBUG
2028 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2029 #endif
2030 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2031 if (rrd_test_error())
2032 return -1;
2033 }
2034 rra_start += rrd->rra_def[rra_idx].row_cnt
2035 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2036 }
2037 return 0;
2038 }
2040 #ifndef HAVE_MMAP
2041 /*
2042 * Flush changes to disk (unless we're using mmap)
2043 *
2044 * Returns 0 on success, -1 otherwise
2045 */
2046 static int write_changes_to_disk(
2047 rrd_t *rrd,
2048 rrd_file_t *rrd_file,
2049 int version)
2050 {
2051 /* we just need to write back the live header portion now */
2052 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2053 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2054 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2055 SEEK_SET) != 0) {
2056 rrd_set_error("seek rrd for live header writeback");
2057 return -1;
2058 }
2059 if (version >= 3) {
2060 if (rrd_write(rrd_file, rrd->live_head,
2061 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2062 rrd_set_error("rrd_write live_head to rrd");
2063 return -1;
2064 }
2065 } else {
2066 if (rrd_write(rrd_file, rrd->legacy_last_up,
2067 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2068 rrd_set_error("rrd_write live_head to rrd");
2069 return -1;
2070 }
2071 }
2074 if (rrd_write(rrd_file, rrd->pdp_prep,
2075 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2076 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2077 rrd_set_error("rrd_write pdp_prep to rrd");
2078 return -1;
2079 }
2081 if (rrd_write(rrd_file, rrd->cdp_prep,
2082 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2083 rrd->stat_head->ds_cnt)
2084 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2085 rrd->stat_head->ds_cnt)) {
2087 rrd_set_error("rrd_write cdp_prep to rrd");
2088 return -1;
2089 }
2091 if (rrd_write(rrd_file, rrd->rra_ptr,
2092 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2093 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2094 rrd_set_error("rrd_write rra_ptr to rrd");
2095 return -1;
2096 }
2097 return 0;
2098 }
2099 #endif