2 /*****************************************************************************
3 * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id: rrd_update.c 1432 2008-06-10 23:12:55Z oetiker $
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
29 * replacement.
30 */
31 #include <sys/timeb.h>
33 #ifndef __MINGW32__
34 struct timeval {
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
37 };
38 #endif
40 struct __timezone {
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
43 };
45 static int gettimeofday(
46 struct timeval *t,
47 struct __timezone *tz)
48 {
50 struct _timeb current_time;
52 _ftime(¤t_time);
54 t->tv_sec = current_time.time;
55 t->tv_usec = current_time.millitm * 1000;
57 return 0;
58 }
60 #endif
62 /* FUNCTION PROTOTYPES */
64 int rrd_update_r(
65 const char *filename,
66 const char *tmplt,
67 int argc,
68 const char **argv);
69 int _rrd_update(
70 const char *filename,
71 const char *tmplt,
72 int argc,
73 const char **argv,
74 rrd_info_t *);
76 static int allocate_data_structures(
77 rrd_t *rrd,
78 char ***updvals,
79 rrd_value_t **pdp_temp,
80 const char *tmplt,
81 long **tmpl_idx,
82 unsigned long *tmpl_cnt,
83 unsigned long **rra_step_cnt,
84 unsigned long **skip_update,
85 rrd_value_t **pdp_new);
87 static int parse_template(
88 rrd_t *rrd,
89 const char *tmplt,
90 unsigned long *tmpl_cnt,
91 long *tmpl_idx);
93 static int process_arg(
94 char *step_start,
95 rrd_t *rrd,
96 rrd_file_t *rrd_file,
97 unsigned long rra_begin,
98 unsigned long *rra_current,
99 time_t *current_time,
100 unsigned long *current_time_usec,
101 rrd_value_t *pdp_temp,
102 rrd_value_t *pdp_new,
103 unsigned long *rra_step_cnt,
104 char **updvals,
105 long *tmpl_idx,
106 unsigned long tmpl_cnt,
107 rrd_info_t ** pcdp_summary,
108 int version,
109 unsigned long *skip_update,
110 int *schedule_smooth);
112 static int parse_ds(
113 rrd_t *rrd,
114 char **updvals,
115 long *tmpl_idx,
116 char *input,
117 unsigned long tmpl_cnt,
118 time_t *current_time,
119 unsigned long *current_time_usec,
120 int version);
122 static int get_time_from_reading(
123 rrd_t *rrd,
124 char timesyntax,
125 char **updvals,
126 time_t *current_time,
127 unsigned long *current_time_usec,
128 int version);
130 static int update_pdp_prep(
131 rrd_t *rrd,
132 char **updvals,
133 rrd_value_t *pdp_new,
134 double interval);
136 static int calculate_elapsed_steps(
137 rrd_t *rrd,
138 unsigned long current_time,
139 unsigned long current_time_usec,
140 double interval,
141 double *pre_int,
142 double *post_int,
143 unsigned long *proc_pdp_cnt);
145 static void simple_update(
146 rrd_t *rrd,
147 double interval,
148 rrd_value_t *pdp_new);
150 static int process_all_pdp_st(
151 rrd_t *rrd,
152 double interval,
153 double pre_int,
154 double post_int,
155 unsigned long elapsed_pdp_st,
156 rrd_value_t *pdp_new,
157 rrd_value_t *pdp_temp);
159 static int process_pdp_st(
160 rrd_t *rrd,
161 unsigned long ds_idx,
162 double interval,
163 double pre_int,
164 double post_int,
165 long diff_pdp_st,
166 rrd_value_t *pdp_new,
167 rrd_value_t *pdp_temp);
169 static int update_all_cdp_prep(
170 rrd_t *rrd,
171 unsigned long *rra_step_cnt,
172 unsigned long rra_begin,
173 rrd_file_t *rrd_file,
174 unsigned long elapsed_pdp_st,
175 unsigned long proc_pdp_cnt,
176 rrd_value_t **last_seasonal_coef,
177 rrd_value_t **seasonal_coef,
178 rrd_value_t *pdp_temp,
179 unsigned long *rra_current,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long elapsed_pdp_st,
216 unsigned long start_pdp_offset,
217 unsigned long pdp_cnt);
219 static void reset_cdp(
220 rrd_t *rrd,
221 unsigned long elapsed_pdp_st,
222 rrd_value_t *pdp_temp,
223 rrd_value_t *last_seasonal_coef,
224 rrd_value_t *seasonal_coef,
225 int rra_idx,
226 int ds_idx,
227 int cdp_idx,
228 enum cf_en current_cf);
230 static rrd_value_t initialize_average_carry_over(
231 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long *rra_current,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
254 rrd_t *rrd,
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
258 unsigned long *rra_current,
259 time_t current_time,
260 unsigned long *skip_update,
261 rrd_info_t ** pcdp_summary);
263 static int write_RRA_row(
264 rrd_file_t *rrd_file,
265 rrd_t *rrd,
266 unsigned long rra_idx,
267 unsigned long *rra_current,
268 unsigned short CDP_scratch_idx,
269 rrd_info_t ** pcdp_summary,
270 time_t rra_time);
272 static int smooth_all_rras(
273 rrd_t *rrd,
274 rrd_file_t *rrd_file,
275 unsigned long rra_begin);
277 #ifndef HAVE_MMAP
278 static int write_changes_to_disk(
279 rrd_t *rrd,
280 rrd_file_t *rrd_file,
281 int version);
282 #endif
284 /*
285 * normalize time as returned by gettimeofday. usec part must
286 * be always >= 0
287 */
288 static inline void normalize_time(
289 struct timeval *t)
290 {
291 if (t->tv_usec < 0) {
292 t->tv_sec--;
293 t->tv_usec += 1e6L;
294 }
295 }
297 /*
298 * Sets current_time and current_time_usec based on the current time.
299 * current_time_usec is set to 0 if the version number is 1 or 2.
300 */
301 static inline void initialize_time(
302 time_t *current_time,
303 unsigned long *current_time_usec,
304 int version)
305 {
306 struct timeval tmp_time; /* used for time conversion */
308 gettimeofday(&tmp_time, 0);
309 normalize_time(&tmp_time);
310 *current_time = tmp_time.tv_sec;
311 if (version >= 3) {
312 *current_time_usec = tmp_time.tv_usec;
313 } else {
314 *current_time_usec = 0;
315 }
316 }
318 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
320 rrd_info_t *rrd_update_v(
321 int argc,
322 char **argv)
323 {
324 char *tmplt = NULL;
325 rrd_info_t *result = NULL;
326 rrd_infoval_t rc;
327 struct option long_options[] = {
328 {"template", required_argument, 0, 't'},
329 {0, 0, 0, 0}
330 };
332 rc.u_int = -1;
333 optind = 0;
334 opterr = 0; /* initialize getopt */
336 while (1) {
337 int option_index = 0;
338 int opt;
340 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
342 if (opt == EOF)
343 break;
345 switch (opt) {
346 case 't':
347 tmplt = optarg;
348 break;
350 case '?':
351 rrd_set_error("unknown option '%s'", argv[optind - 1]);
352 goto end_tag;
353 }
354 }
356 /* need at least 2 arguments: filename, data. */
357 if (argc - optind < 2) {
358 rrd_set_error("Not enough arguments");
359 goto end_tag;
360 }
361 rc.u_int = 0;
362 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
363 rc.u_int = _rrd_update(argv[optind], tmplt,
364 argc - optind - 1,
365 (const char **) (argv + optind + 1), result);
366 result->value.u_int = rc.u_int;
367 end_tag:
368 return result;
369 }
371 int rrd_update(
372 int argc,
373 char **argv)
374 {
375 struct option long_options[] = {
376 {"template", required_argument, 0, 't'},
377 {0, 0, 0, 0}
378 };
379 int option_index = 0;
380 int opt;
381 char *tmplt = NULL;
382 int rc = -1;
384 optind = 0;
385 opterr = 0; /* initialize getopt */
387 while (1) {
388 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
390 if (opt == EOF)
391 break;
393 switch (opt) {
394 case 't':
395 tmplt = strdup(optarg);
396 break;
398 case '?':
399 rrd_set_error("unknown option '%s'", argv[optind - 1]);
400 goto out;
401 }
402 }
404 /* need at least 2 arguments: filename, data. */
405 if (argc - optind < 2) {
406 rrd_set_error("Not enough arguments");
407 goto out;
408 }
410 rc = rrd_update_r(argv[optind], tmplt,
411 argc - optind - 1, (const char **) (argv + optind + 1));
412 out:
413 free(tmplt);
414 return rc;
415 }
417 int rrd_update_r(
418 const char *filename,
419 const char *tmplt,
420 int argc,
421 const char **argv)
422 {
423 return _rrd_update(filename, tmplt, argc, argv, NULL);
424 }
426 int _rrd_update(
427 const char *filename,
428 const char *tmplt,
429 int argc,
430 const char **argv,
431 rrd_info_t * pcdp_summary)
432 {
434 int arg_i = 2;
436 unsigned long rra_begin; /* byte pointer to the rra
437 * area in the rrd file. this
438 * pointer never changes value */
439 unsigned long rra_current; /* byte pointer to the current write
440 * spot in the rrd file. */
441 rrd_value_t *pdp_new; /* prepare the incoming data to be added
442 * to the existing entry */
443 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
444 * to the cdp values */
446 long *tmpl_idx; /* index representing the settings
447 * transported by the tmplt index */
448 unsigned long tmpl_cnt = 2; /* time and data */
449 rrd_t rrd;
450 time_t current_time = 0;
451 unsigned long current_time_usec = 0; /* microseconds part of current time */
452 char **updvals;
453 int schedule_smooth = 0;
455 /* number of elapsed PDP steps since last update */
456 unsigned long *rra_step_cnt = NULL;
458 int version; /* rrd version */
459 rrd_file_t *rrd_file;
460 char *arg_copy; /* for processing the argv */
461 unsigned long *skip_update; /* RRAs to advance but not write */
463 /* need at least 1 arguments: data. */
464 if (argc < 1) {
465 rrd_set_error("Not enough arguments");
466 goto err_out;
467 }
469 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
470 goto err_free;
471 }
472 /* We are now at the beginning of the rra's */
473 rra_current = rra_begin = rrd_file->header_len;
475 version = atoi(rrd.stat_head->version);
477 initialize_time(¤t_time, ¤t_time_usec, version);
479 /* get exclusive lock to whole file.
480 * lock gets removed when we close the file.
481 */
482 if (rrd_lock(rrd_file) != 0) {
483 rrd_set_error("could not lock RRD");
484 goto err_close;
485 }
487 if (allocate_data_structures(&rrd, &updvals,
488 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
489 &rra_step_cnt, &skip_update,
490 &pdp_new) == -1) {
491 goto err_close;
492 }
494 /* loop through the arguments. */
495 for (arg_i = 0; arg_i < argc; arg_i++) {
496 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
497 rrd_set_error("failed duplication argv entry");
498 break;
499 }
500 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
501 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
502 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
503 &pcdp_summary, version, skip_update,
504 &schedule_smooth) == -1) {
505 free(arg_copy);
506 break;
507 }
508 free(arg_copy);
509 }
511 free(rra_step_cnt);
513 /* if we got here and if there is an error and if the file has not been
514 * written to, then close things up and return. */
515 if (rrd_test_error()) {
516 goto err_free_structures;
517 }
518 #ifndef HAVE_MMAP
519 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
520 goto err_free_structures;
521 }
522 #endif
524 /* calling the smoothing code here guarantees at most one smoothing
525 * operation per rrd_update call. Unfortunately, it is possible with bulk
526 * updates, or a long-delayed update for smoothing to occur off-schedule.
527 * This really isn't critical except during the burn-in cycles. */
528 if (schedule_smooth) {
529 smooth_all_rras(&rrd, rrd_file, rra_begin);
530 }
532 /* rrd_dontneed(rrd_file,&rrd); */
533 rrd_free(&rrd);
534 rrd_close(rrd_file);
536 free(pdp_new);
537 free(tmpl_idx);
538 free(pdp_temp);
539 free(skip_update);
540 free(updvals);
541 return 0;
543 err_free_structures:
544 free(pdp_new);
545 free(tmpl_idx);
546 free(pdp_temp);
547 free(skip_update);
548 free(updvals);
549 err_close:
550 rrd_close(rrd_file);
551 err_free:
552 rrd_free(&rrd);
553 err_out:
554 return -1;
555 }
557 /*
558 * get exclusive lock to whole file.
559 * lock gets removed when we close the file
560 *
561 * returns 0 on success
562 */
563 int rrd_lock(
564 rrd_file_t *file)
565 {
566 int rcstat;
568 {
569 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
570 struct _stat st;
572 if (_fstat(file->fd, &st) == 0) {
573 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
574 } else {
575 rcstat = -1;
576 }
577 #else
578 struct flock lock;
580 lock.l_type = F_WRLCK; /* exclusive write lock */
581 lock.l_len = 0; /* whole file */
582 lock.l_start = 0; /* start of file */
583 lock.l_whence = SEEK_SET; /* end of file */
585 rcstat = fcntl(file->fd, F_SETLK, &lock);
586 #endif
587 }
589 return (rcstat);
590 }
592 /*
593 * Allocate some important arrays used, and initialize the template.
594 *
595 * When it returns, either all of the structures are allocated
596 * or none of them are.
597 *
598 * Returns 0 on success, -1 on error.
599 */
600 static int allocate_data_structures(
601 rrd_t *rrd,
602 char ***updvals,
603 rrd_value_t **pdp_temp,
604 const char *tmplt,
605 long **tmpl_idx,
606 unsigned long *tmpl_cnt,
607 unsigned long **rra_step_cnt,
608 unsigned long **skip_update,
609 rrd_value_t **pdp_new)
610 {
611 unsigned i, ii;
612 if ((*updvals = (char **) malloc(sizeof(char *)
613 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
614 rrd_set_error("allocating updvals pointer array.");
615 return -1;
616 }
617 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
618 * rrd->stat_head->ds_cnt)) ==
619 NULL) {
620 rrd_set_error("allocating pdp_temp.");
621 goto err_free_updvals;
622 }
623 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
624 *
625 rrd->stat_head->rra_cnt)) ==
626 NULL) {
627 rrd_set_error("allocating skip_update.");
628 goto err_free_pdp_temp;
629 }
630 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
631 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
632 rrd_set_error("allocating tmpl_idx.");
633 goto err_free_skip_update;
634 }
635 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
636 *
637 (rrd->stat_head->
638 rra_cnt))) == NULL) {
639 rrd_set_error("allocating rra_step_cnt.");
640 goto err_free_tmpl_idx;
641 }
643 /* initialize tmplt redirector */
644 /* default config example (assume DS 1 is a CDEF DS)
645 tmpl_idx[0] -> 0; (time)
646 tmpl_idx[1] -> 1; (DS 0)
647 tmpl_idx[2] -> 3; (DS 2)
648 tmpl_idx[3] -> 4; (DS 3) */
649 (*tmpl_idx)[0] = 0; /* time */
650 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
651 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
652 (*tmpl_idx)[ii++] = i;
653 }
654 *tmpl_cnt = ii;
656 if (tmplt != NULL) {
657 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
658 goto err_free_rra_step_cnt;
659 }
660 }
662 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
663 * rrd->stat_head->ds_cnt)) == NULL) {
664 rrd_set_error("allocating pdp_new.");
665 goto err_free_rra_step_cnt;
666 }
668 return 0;
670 err_free_rra_step_cnt:
671 free(*rra_step_cnt);
672 err_free_tmpl_idx:
673 free(*tmpl_idx);
674 err_free_skip_update:
675 free(*skip_update);
676 err_free_pdp_temp:
677 free(*pdp_temp);
678 err_free_updvals:
679 free(*updvals);
680 return -1;
681 }
683 /*
684 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
685 *
686 * Returns 0 on success.
687 */
688 static int parse_template(
689 rrd_t *rrd,
690 const char *tmplt,
691 unsigned long *tmpl_cnt,
692 long *tmpl_idx)
693 {
694 char *dsname, *tmplt_copy;
695 unsigned int tmpl_len, i;
696 int ret = 0;
698 *tmpl_cnt = 1; /* the first entry is the time */
700 /* we should work on a writeable copy here */
701 if ((tmplt_copy = strdup(tmplt)) == NULL) {
702 rrd_set_error("error copying tmplt '%s'", tmplt);
703 ret = -1;
704 goto out;
705 }
707 dsname = tmplt_copy;
708 tmpl_len = strlen(tmplt_copy);
709 for (i = 0; i <= tmpl_len; i++) {
710 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
711 tmplt_copy[i] = '\0';
712 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
713 rrd_set_error("tmplt contains more DS definitions than RRD");
714 ret = -1;
715 goto out_free_tmpl_copy;
716 }
717 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
718 rrd_set_error("unknown DS name '%s'", dsname);
719 ret = -1;
720 goto out_free_tmpl_copy;
721 }
722 /* go to the next entry on the tmplt_copy */
723 if (i < tmpl_len)
724 dsname = &tmplt_copy[i + 1];
725 }
726 }
727 out_free_tmpl_copy:
728 free(tmplt_copy);
729 out:
730 return ret;
731 }
733 /*
734 * Parse an update string, updates the primary data points (PDPs)
735 * and consolidated data points (CDPs), and writes changes to the RRAs.
736 *
737 * Returns 0 on success, -1 on error.
738 */
739 static int process_arg(
740 char *step_start,
741 rrd_t *rrd,
742 rrd_file_t *rrd_file,
743 unsigned long rra_begin,
744 unsigned long *rra_current,
745 time_t *current_time,
746 unsigned long *current_time_usec,
747 rrd_value_t *pdp_temp,
748 rrd_value_t *pdp_new,
749 unsigned long *rra_step_cnt,
750 char **updvals,
751 long *tmpl_idx,
752 unsigned long tmpl_cnt,
753 rrd_info_t ** pcdp_summary,
754 int version,
755 unsigned long *skip_update,
756 int *schedule_smooth)
757 {
758 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
760 /* a vector of future Holt-Winters seasonal coefs */
761 unsigned long elapsed_pdp_st;
763 double interval, pre_int, post_int; /* interval between this and
764 * the last run */
765 unsigned long proc_pdp_cnt;
766 unsigned long rra_start;
768 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
769 current_time, current_time_usec, version) == -1) {
770 return -1;
771 }
772 /* seek to the beginning of the rra's */
773 if (*rra_current != rra_begin) {
774 #ifndef HAVE_MMAP
775 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
776 rrd_set_error("seek error in rrd");
777 return -1;
778 }
779 #endif
780 *rra_current = rra_begin;
781 }
782 rra_start = rra_begin;
784 interval = (double) (*current_time - rrd->live_head->last_up)
785 + (double) ((long) *current_time_usec -
786 (long) rrd->live_head->last_up_usec) / 1e6f;
788 /* process the data sources and update the pdp_prep
789 * area accordingly */
790 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
791 return -1;
792 }
794 elapsed_pdp_st = calculate_elapsed_steps(rrd,
795 *current_time,
796 *current_time_usec, interval,
797 &pre_int, &post_int,
798 &proc_pdp_cnt);
800 /* has a pdp_st moment occurred since the last run ? */
801 if (elapsed_pdp_st == 0) {
802 /* no we have not passed a pdp_st moment. therefore update is simple */
803 simple_update(rrd, interval, pdp_new);
804 } else {
805 /* an pdp_st has occurred. */
806 if (process_all_pdp_st(rrd, interval,
807 pre_int, post_int,
808 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
809 return -1;
810 }
811 if (update_all_cdp_prep(rrd, rra_step_cnt,
812 rra_begin, rrd_file,
813 elapsed_pdp_st,
814 proc_pdp_cnt,
815 &last_seasonal_coef,
816 &seasonal_coef,
817 pdp_temp, rra_current,
818 skip_update, schedule_smooth) == -1) {
819 goto err_free_coefficients;
820 }
821 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
822 elapsed_pdp_st, pdp_temp,
823 &seasonal_coef) == -1) {
824 goto err_free_coefficients;
825 }
826 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
827 rra_current, *current_time, skip_update,
828 pcdp_summary) == -1) {
829 goto err_free_coefficients;
830 }
831 } /* endif a pdp_st has occurred */
832 rrd->live_head->last_up = *current_time;
833 rrd->live_head->last_up_usec = *current_time_usec;
835 if (version < 3) {
836 *rrd->legacy_last_up = rrd->live_head->last_up;
837 }
838 free(seasonal_coef);
839 free(last_seasonal_coef);
840 return 0;
842 err_free_coefficients:
843 free(seasonal_coef);
844 free(last_seasonal_coef);
845 return -1;
846 }
848 /*
849 * Parse a DS string (time + colon-separated values), storing the
850 * results in current_time, current_time_usec, and updvals.
851 *
852 * Returns 0 on success, -1 on error.
853 */
854 static int parse_ds(
855 rrd_t *rrd,
856 char **updvals,
857 long *tmpl_idx,
858 char *input,
859 unsigned long tmpl_cnt,
860 time_t *current_time,
861 unsigned long *current_time_usec,
862 int version)
863 {
864 char *p;
865 unsigned long i;
866 char timesyntax;
868 updvals[0] = input;
869 /* initialize all ds input to unknown except the first one
870 which has always got to be set */
871 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
872 updvals[i] = "U";
874 /* separate all ds elements; first must be examined separately
875 due to alternate time syntax */
876 if ((p = strchr(input, '@')) != NULL) {
877 timesyntax = '@';
878 } else if ((p = strchr(input, ':')) != NULL) {
879 timesyntax = ':';
880 } else {
881 rrd_set_error("expected timestamp not found in data source from %s",
882 input);
883 return -1;
884 }
885 *p = '\0';
886 i = 1;
887 updvals[tmpl_idx[i++]] = p + 1;
888 while (*(++p)) {
889 if (*p == ':') {
890 *p = '\0';
891 if (i < tmpl_cnt) {
892 updvals[tmpl_idx[i++]] = p + 1;
893 }
894 }
895 }
897 if (i != tmpl_cnt) {
898 rrd_set_error("expected %lu data source readings (got %lu) from %s",
899 tmpl_cnt - 1, i, input);
900 return -1;
901 }
903 if (get_time_from_reading(rrd, timesyntax, updvals,
904 current_time, current_time_usec,
905 version) == -1) {
906 return -1;
907 }
908 return 0;
909 }
911 /*
912 * Parse the time in a DS string, store it in current_time and
913 * current_time_usec and verify that it's later than the last
914 * update for this DS.
915 *
916 * Returns 0 on success, -1 on error.
917 */
918 static int get_time_from_reading(
919 rrd_t *rrd,
920 char timesyntax,
921 char **updvals,
922 time_t *current_time,
923 unsigned long *current_time_usec,
924 int version)
925 {
926 double tmp;
927 char *parsetime_error = NULL;
928 char *old_locale;
929 rrd_time_value_t ds_tv;
930 struct timeval tmp_time; /* used for time conversion */
932 /* get the time from the reading ... handle N */
933 if (timesyntax == '@') { /* at-style */
934 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
935 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
936 return -1;
937 }
938 if (ds_tv.type == RELATIVE_TO_END_TIME ||
939 ds_tv.type == RELATIVE_TO_START_TIME) {
940 rrd_set_error("specifying time relative to the 'start' "
941 "or 'end' makes no sense here: %s", updvals[0]);
942 return -1;
943 }
944 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
945 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
946 } else if (strcmp(updvals[0], "N") == 0) {
947 gettimeofday(&tmp_time, 0);
948 normalize_time(&tmp_time);
949 *current_time = tmp_time.tv_sec;
950 *current_time_usec = tmp_time.tv_usec;
951 } else {
952 old_locale = setlocale(LC_NUMERIC, "C");
953 tmp = strtod(updvals[0], 0);
954 setlocale(LC_NUMERIC, old_locale);
955 *current_time = floor(tmp);
956 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
957 }
958 /* dont do any correction for old version RRDs */
959 if (version < 3)
960 *current_time_usec = 0;
962 if (*current_time < rrd->live_head->last_up ||
963 (*current_time == rrd->live_head->last_up &&
964 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
965 rrd_set_error("illegal attempt to update using time %ld when "
966 "last update time is %ld (minimum one second step)",
967 *current_time, rrd->live_head->last_up);
968 return -1;
969 }
970 return 0;
971 }
973 /*
974 * Update pdp_new by interpreting the updvals according to the DS type
975 * (COUNTER, GAUGE, etc.).
976 *
977 * Returns 0 on success, -1 on error.
978 */
979 static int update_pdp_prep(
980 rrd_t *rrd,
981 char **updvals,
982 rrd_value_t *pdp_new,
983 double interval)
984 {
985 unsigned long ds_idx;
986 int ii;
987 char *endptr; /* used in the conversion */
988 double rate;
989 char *old_locale;
990 enum dst_en dst_idx;
992 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
993 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
995 /* make sure we do not build diffs with old last_ds values */
996 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
997 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
998 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
999 }
1001 /* NOTE: DST_CDEF should never enter this if block, because
1002 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1003 * accidently specified a value for the DST_CDEF. To handle this case,
1004 * an extra check is required. */
1006 if ((updvals[ds_idx + 1][0] != 'U') &&
1007 (dst_idx != DST_CDEF) &&
1008 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1009 rate = DNAN;
1011 /* pdp_new contains rate * time ... eg the bytes transferred during
1012 * the interval. Doing it this way saves a lot of math operations
1013 */
1014 switch (dst_idx) {
1015 case DST_COUNTER:
1016 case DST_DERIVE:
1017 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1018 if ((updvals[ds_idx + 1][ii] < '0'
1019 || updvals[ds_idx + 1][ii] > '9')
1020 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1021 rrd_set_error("not a simple integer: '%s'",
1022 updvals[ds_idx + 1]);
1023 return -1;
1024 }
1025 }
1026 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1027 pdp_new[ds_idx] =
1028 rrd_diff(updvals[ds_idx + 1],
1029 rrd->pdp_prep[ds_idx].last_ds);
1030 if (dst_idx == DST_COUNTER) {
1031 /* simple overflow catcher. This will fail
1032 * terribly for non 32 or 64 bit counters
1033 * ... are there any others in SNMP land?
1034 */
1035 if (pdp_new[ds_idx] < (double) 0.0)
1036 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1037 if (pdp_new[ds_idx] < (double) 0.0)
1038 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1039 }
1040 rate = pdp_new[ds_idx] / interval;
1041 } else {
1042 pdp_new[ds_idx] = DNAN;
1043 }
1044 break;
1045 case DST_ABSOLUTE:
1046 old_locale = setlocale(LC_NUMERIC, "C");
1047 errno = 0;
1048 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1049 setlocale(LC_NUMERIC, old_locale);
1050 if (errno > 0) {
1051 rrd_set_error("converting '%s' to float: %s",
1052 updvals[ds_idx + 1], rrd_strerror(errno));
1053 return -1;
1054 };
1055 if (endptr[0] != '\0') {
1056 rrd_set_error
1057 ("conversion of '%s' to float not complete: tail '%s'",
1058 updvals[ds_idx + 1], endptr);
1059 return -1;
1060 }
1061 rate = pdp_new[ds_idx] / interval;
1062 break;
1063 case DST_GAUGE:
1064 errno = 0;
1065 old_locale = setlocale(LC_NUMERIC, "C");
1066 pdp_new[ds_idx] =
1067 strtod(updvals[ds_idx + 1], &endptr) * interval;
1068 setlocale(LC_NUMERIC, old_locale);
1069 if (errno) {
1070 rrd_set_error("converting '%s' to float: %s",
1071 updvals[ds_idx + 1], rrd_strerror(errno));
1072 return -1;
1073 };
1074 if (endptr[0] != '\0') {
1075 rrd_set_error
1076 ("conversion of '%s' to float not complete: tail '%s'",
1077 updvals[ds_idx + 1], endptr);
1078 return -1;
1079 }
1080 rate = pdp_new[ds_idx] / interval;
1081 break;
1082 default:
1083 rrd_set_error("rrd contains unknown DS type : '%s'",
1084 rrd->ds_def[ds_idx].dst);
1085 return -1;
1086 }
1087 /* break out of this for loop if the error string is set */
1088 if (rrd_test_error()) {
1089 return -1;
1090 }
1091 /* make sure pdp_temp is neither too large or too small
1092 * if any of these occur it becomes unknown ...
1093 * sorry folks ... */
1094 if (!isnan(rate) &&
1095 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1096 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1097 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1098 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1099 pdp_new[ds_idx] = DNAN;
1100 }
1101 } else {
1102 /* no news is news all the same */
1103 pdp_new[ds_idx] = DNAN;
1104 }
1107 /* make a copy of the command line argument for the next run */
1108 #ifdef DEBUG
1109 fprintf(stderr, "prep ds[%lu]\t"
1110 "last_arg '%s'\t"
1111 "this_arg '%s'\t"
1112 "pdp_new %10.2f\n",
1113 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1114 pdp_new[ds_idx]);
1115 #endif
1116 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1117 LAST_DS_LEN - 1);
1118 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1119 }
1120 return 0;
1121 }
1123 /*
1124 * How many PDP steps have elapsed since the last update? Returns the answer,
1125 * and stores the time between the last update and the last PDP in pre_time,
1126 * and the time between the last PDP and the current time in post_int.
1127 */
1128 static int calculate_elapsed_steps(
1129 rrd_t *rrd,
1130 unsigned long current_time,
1131 unsigned long current_time_usec,
1132 double interval,
1133 double *pre_int,
1134 double *post_int,
1135 unsigned long *proc_pdp_cnt)
1136 {
1137 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1138 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1139 * time */
1140 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1141 * when it was last updated */
1142 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1144 /* when was the current pdp started */
1145 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1146 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1148 /* when did the last pdp_st occur */
1149 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1150 occu_pdp_st = current_time - occu_pdp_age;
1152 if (occu_pdp_st > proc_pdp_st) {
1153 /* OK we passed the pdp_st moment */
1154 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1155 * occurred before the latest
1156 * pdp_st moment*/
1157 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1158 *post_int = occu_pdp_age; /* how much after it */
1159 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1160 } else {
1161 *pre_int = interval;
1162 *post_int = 0;
1163 }
1165 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1167 #ifdef DEBUG
1168 printf("proc_pdp_age %lu\t"
1169 "proc_pdp_st %lu\t"
1170 "occu_pfp_age %lu\t"
1171 "occu_pdp_st %lu\t"
1172 "int %lf\t"
1173 "pre_int %lf\t"
1174 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1175 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1176 #endif
1178 /* compute the number of elapsed pdp_st moments */
1179 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1180 }
1182 /*
1183 * Increment the PDP values by the values in pdp_new, or else initialize them.
1184 */
1185 static void simple_update(
1186 rrd_t *rrd,
1187 double interval,
1188 rrd_value_t *pdp_new)
1189 {
1190 int i;
1192 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1193 if (isnan(pdp_new[i])) {
1194 /* this is not really accurate if we use subsecond data arrival time
1195 should have thought of it when going subsecond resolution ...
1196 sorry next format change we will have it! */
1197 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1198 floor(interval);
1199 } else {
1200 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1201 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1202 } else {
1203 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1204 }
1205 }
1206 #ifdef DEBUG
1207 fprintf(stderr,
1208 "NO PDP ds[%i]\t"
1209 "value %10.2f\t"
1210 "unkn_sec %5lu\n",
1211 i,
1212 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1213 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1214 #endif
1215 }
1216 }
1218 /*
1219 * Call process_pdp_st for each DS.
1220 *
1221 * Returns 0 on success, -1 on error.
1222 */
1223 static int process_all_pdp_st(
1224 rrd_t *rrd,
1225 double interval,
1226 double pre_int,
1227 double post_int,
1228 unsigned long elapsed_pdp_st,
1229 rrd_value_t *pdp_new,
1230 rrd_value_t *pdp_temp)
1231 {
1232 unsigned long ds_idx;
1234 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1235 rate*seconds which occurred up to the last run.
1236 pdp_new[] contains rate*seconds from the latest run.
1237 pdp_temp[] will contain the rate for cdp */
1239 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1240 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1241 elapsed_pdp_st * rrd->stat_head->pdp_step,
1242 pdp_new, pdp_temp) == -1) {
1243 return -1;
1244 }
1245 #ifdef DEBUG
1246 fprintf(stderr, "PDP UPD ds[%lu]\t"
1247 "elapsed_pdp_st %lu\t"
1248 "pdp_temp %10.2f\t"
1249 "new_prep %10.2f\t"
1250 "new_unkn_sec %5lu\n",
1251 ds_idx,
1252 elapsed_pdp_st,
1253 pdp_temp[ds_idx],
1254 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1255 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1256 #endif
1257 }
1258 return 0;
1259 }
1261 /*
1262 * Process an update that occurs after one of the PDP moments.
1263 * Increments the PDP value, sets NAN if time greater than the
1264 * heartbeats have elapsed, processes CDEFs.
1265 *
1266 * Returns 0 on success, -1 on error.
1267 */
1268 static int process_pdp_st(
1269 rrd_t *rrd,
1270 unsigned long ds_idx,
1271 double interval,
1272 double pre_int,
1273 double post_int,
1274 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1275 rrd_value_t *pdp_new,
1276 rrd_value_t *pdp_temp)
1277 {
1278 int i;
1280 /* update pdp_prep to the current pdp_st. */
1281 double pre_unknown = 0.0;
1282 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1283 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1285 rpnstack_t rpnstack; /* used for COMPUTE DS */
1287 rpnstack_init(&rpnstack);
1290 if (isnan(pdp_new[ds_idx])) {
1291 /* a final bit of unknown to be added before calculation
1292 we use a temporary variable for this so that we
1293 don't have to turn integer lines before using the value */
1294 pre_unknown = pre_int;
1295 } else {
1296 if (isnan(scratch[PDP_val].u_val)) {
1297 scratch[PDP_val].u_val = 0;
1298 }
1299 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1300 }
1302 /* if too much of the pdp_prep is unknown we dump it */
1303 /* if the interval is larger thatn mrhb we get NAN */
1304 if ((interval > mrhb) ||
1305 (rrd->stat_head->pdp_step / 2.0 <
1306 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1307 pdp_temp[ds_idx] = DNAN;
1308 } else {
1309 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1310 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1311 pre_unknown);
1312 }
1314 /* process CDEF data sources; remember each CDEF DS can
1315 * only reference other DS with a lower index number */
1316 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1317 rpnp_t *rpnp;
1319 rpnp =
1320 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1321 /* substitute data values for OP_VARIABLE nodes */
1322 for (i = 0; rpnp[i].op != OP_END; i++) {
1323 if (rpnp[i].op == OP_VARIABLE) {
1324 rpnp[i].op = OP_NUMBER;
1325 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1326 }
1327 }
1328 /* run the rpn calculator */
1329 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1330 free(rpnp);
1331 rpnstack_free(&rpnstack);
1332 return -1;
1333 }
1334 }
1336 /* make pdp_prep ready for the next run */
1337 if (isnan(pdp_new[ds_idx])) {
1338 /* this is not realy accurate if we use subsecond data arival time
1339 should have thought of it when going subsecond resolution ...
1340 sorry next format change we will have it! */
1341 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1342 scratch[PDP_val].u_val = DNAN;
1343 } else {
1344 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1345 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1346 }
1347 rpnstack_free(&rpnstack);
1348 return 0;
1349 }
1351 /*
1352 * Iterate over all the RRAs for a given DS and:
1353 * 1. Decide whether to schedule a smooth later
1354 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1355 * 3. Update the CDP
1356 *
1357 * Returns 0 on success, -1 on error
1358 */
1359 static int update_all_cdp_prep(
1360 rrd_t *rrd,
1361 unsigned long *rra_step_cnt,
1362 unsigned long rra_begin,
1363 rrd_file_t *rrd_file,
1364 unsigned long elapsed_pdp_st,
1365 unsigned long proc_pdp_cnt,
1366 rrd_value_t **last_seasonal_coef,
1367 rrd_value_t **seasonal_coef,
1368 rrd_value_t *pdp_temp,
1369 unsigned long *rra_current,
1370 unsigned long *skip_update,
1371 int *schedule_smooth)
1372 {
1373 unsigned long rra_idx;
1375 /* index into the CDP scratch array */
1376 enum cf_en current_cf;
1377 unsigned long rra_start;
1379 /* number of rows to be updated in an RRA for a data value. */
1380 unsigned long start_pdp_offset;
1382 rra_start = rra_begin;
1383 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1384 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1385 start_pdp_offset =
1386 rrd->rra_def[rra_idx].pdp_cnt -
1387 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1388 skip_update[rra_idx] = 0;
1389 if (start_pdp_offset <= elapsed_pdp_st) {
1390 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1391 rrd->rra_def[rra_idx].pdp_cnt + 1;
1392 } else {
1393 rra_step_cnt[rra_idx] = 0;
1394 }
1396 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1397 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1398 * so that they will be correct for the next observed value; note that for
1399 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1400 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1401 if (rra_step_cnt[rra_idx] > 1) {
1402 skip_update[rra_idx] = 1;
1403 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1404 elapsed_pdp_st, last_seasonal_coef);
1405 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1406 elapsed_pdp_st + 1, seasonal_coef);
1407 }
1408 /* periodically run a smoother for seasonal effects */
1409 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1410 #ifdef DEBUG
1411 fprintf(stderr,
1412 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1413 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1414 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1415 u_cnt);
1416 #endif
1417 *schedule_smooth = 1;
1418 }
1419 *rra_current = rrd_tell(rrd_file);
1420 }
1421 if (rrd_test_error())
1422 return -1;
1424 if (update_cdp_prep
1425 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1426 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1427 current_cf) == -1) {
1428 return -1;
1429 }
1430 rra_start +=
1431 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1432 sizeof(rrd_value_t);
1433 }
1434 return 0;
1435 }
1437 /*
1438 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1439 */
1440 static int do_schedule_smooth(
1441 rrd_t *rrd,
1442 unsigned long rra_idx,
1443 unsigned long elapsed_pdp_st)
1444 {
1445 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1446 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1447 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1448 unsigned long seasonal_smooth_idx =
1449 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1450 unsigned long *init_seasonal =
1451 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1453 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1454 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1455 * really an RRA level, not a data source within RRA level parameter, but
1456 * the rra_def is read only for rrd_update (not flushed to disk). */
1457 if (*init_seasonal > BURNIN_CYCLES) {
1458 /* someone has no doubt invented a trick to deal with this wrap around,
1459 * but at least this code is clear. */
1460 if (seasonal_smooth_idx > cur_row) {
1461 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1462 * between PDP and CDP */
1463 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1464 }
1465 /* can't rely on negative numbers because we are working with
1466 * unsigned values */
1467 return (cur_row + elapsed_pdp_st >= row_cnt
1468 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1469 }
1470 /* mark off one of the burn-in cycles */
1471 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1472 }
1474 /*
1475 * For a given RRA, iterate over the data sources and call the appropriate
1476 * consolidation function.
1477 *
1478 * Returns 0 on success, -1 on error.
1479 */
1480 static int update_cdp_prep(
1481 rrd_t *rrd,
1482 unsigned long elapsed_pdp_st,
1483 unsigned long start_pdp_offset,
1484 unsigned long *rra_step_cnt,
1485 int rra_idx,
1486 rrd_value_t *pdp_temp,
1487 rrd_value_t *last_seasonal_coef,
1488 rrd_value_t *seasonal_coef,
1489 int current_cf)
1490 {
1491 unsigned long ds_idx, cdp_idx;
1493 /* update CDP_PREP areas */
1494 /* loop over data soures within each RRA */
1495 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1497 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1499 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1500 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1501 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1502 elapsed_pdp_st, start_pdp_offset,
1503 rrd->rra_def[rra_idx].pdp_cnt,
1504 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1505 rra_idx, ds_idx);
1506 } else {
1507 /* Nothing to consolidate if there's one PDP per CDP. However, if
1508 * we've missed some PDPs, let's update null counters etc. */
1509 if (elapsed_pdp_st > 2) {
1510 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1511 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1512 current_cf);
1513 }
1514 }
1516 if (rrd_test_error())
1517 return -1;
1518 } /* endif data sources loop */
1519 return 0;
1520 }
1522 /*
1523 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1524 * primary value, secondary value, and # of unknowns.
1525 */
1526 static void update_cdp(
1527 unival *scratch,
1528 int current_cf,
1529 rrd_value_t pdp_temp_val,
1530 unsigned long rra_step_cnt,
1531 unsigned long elapsed_pdp_st,
1532 unsigned long start_pdp_offset,
1533 unsigned long pdp_cnt,
1534 rrd_value_t xff,
1535 int i,
1536 int ii)
1537 {
1538 /* shorthand variables */
1539 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1540 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1541 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1542 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1544 if (rra_step_cnt) {
1545 /* If we are in this block, as least 1 CDP value will be written to
1546 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1547 * to be written, then the "fill in" value is the CDP_secondary_val
1548 * entry. */
1549 if (isnan(pdp_temp_val)) {
1550 *cdp_unkn_pdp_cnt += start_pdp_offset;
1551 *cdp_secondary_val = DNAN;
1552 } else {
1553 /* CDP_secondary value is the RRA "fill in" value for intermediary
1554 * CDP data entries. No matter the CF, the value is the same because
1555 * the average, max, min, and last of a list of identical values is
1556 * the same, namely, the value itself. */
1557 *cdp_secondary_val = pdp_temp_val;
1558 }
1560 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1561 *cdp_primary_val = DNAN;
1562 if (current_cf == CF_AVERAGE) {
1563 *cdp_val =
1564 initialize_average_carry_over(pdp_temp_val,
1565 elapsed_pdp_st,
1566 start_pdp_offset, pdp_cnt);
1567 } else {
1568 *cdp_val = pdp_temp_val;
1569 }
1570 } else {
1571 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1572 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1573 } /* endif meets xff value requirement for a valid value */
1574 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1575 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1576 if (isnan(pdp_temp_val))
1577 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1578 else
1579 *cdp_unkn_pdp_cnt = 0;
1580 } else { /* rra_step_cnt[i] == 0 */
1582 #ifdef DEBUG
1583 if (isnan(*cdp_val)) {
1584 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1585 i, ii);
1586 } else {
1587 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1588 i, ii, *cdp_val);
1589 }
1590 #endif
1591 if (isnan(pdp_temp_val)) {
1592 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1593 } else {
1594 *cdp_val =
1595 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1596 current_cf, i, ii);
1597 }
1598 }
1599 }
1601 /*
1602 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1603 * on the type of consolidation function.
1604 */
1605 static void initialize_cdp_val(
1606 unival *scratch,
1607 int current_cf,
1608 rrd_value_t pdp_temp_val,
1609 unsigned long elapsed_pdp_st,
1610 unsigned long start_pdp_offset,
1611 unsigned long pdp_cnt)
1612 {
1613 rrd_value_t cum_val, cur_val;
1615 switch (current_cf) {
1616 case CF_AVERAGE:
1617 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1618 cur_val = IFDNAN(pdp_temp_val, 0.0);
1619 scratch[CDP_primary_val].u_val =
1620 (cum_val + cur_val * start_pdp_offset) /
1621 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1622 scratch[CDP_val].u_val =
1623 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1624 start_pdp_offset, pdp_cnt);
1625 break;
1626 case CF_MAXIMUM:
1627 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1628 cur_val = IFDNAN(pdp_temp_val, -DINF);
1629 #if 0
1630 #ifdef DEBUG
1631 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1632 fprintf(stderr,
1633 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1634 i, ii);
1635 exit(-1);
1636 }
1637 #endif
1638 #endif
1639 if (cur_val > cum_val)
1640 scratch[CDP_primary_val].u_val = cur_val;
1641 else
1642 scratch[CDP_primary_val].u_val = cum_val;
1643 /* initialize carry over value */
1644 scratch[CDP_val].u_val = pdp_temp_val;
1645 break;
1646 case CF_MINIMUM:
1647 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1648 cur_val = IFDNAN(pdp_temp_val, DINF);
1649 #if 0
1650 #ifdef DEBUG
1651 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1652 fprintf(stderr,
1653 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1654 ii);
1655 exit(-1);
1656 }
1657 #endif
1658 #endif
1659 if (cur_val < cum_val)
1660 scratch[CDP_primary_val].u_val = cur_val;
1661 else
1662 scratch[CDP_primary_val].u_val = cum_val;
1663 /* initialize carry over value */
1664 scratch[CDP_val].u_val = pdp_temp_val;
1665 break;
1666 case CF_LAST:
1667 default:
1668 scratch[CDP_primary_val].u_val = pdp_temp_val;
1669 /* initialize carry over value */
1670 scratch[CDP_val].u_val = pdp_temp_val;
1671 break;
1672 }
1673 }
1675 /*
1676 * Update the consolidation function for Holt-Winters functions as
1677 * well as other functions that don't actually consolidate multiple
1678 * PDPs.
1679 */
1680 static void reset_cdp(
1681 rrd_t *rrd,
1682 unsigned long elapsed_pdp_st,
1683 rrd_value_t *pdp_temp,
1684 rrd_value_t *last_seasonal_coef,
1685 rrd_value_t *seasonal_coef,
1686 int rra_idx,
1687 int ds_idx,
1688 int cdp_idx,
1689 enum cf_en current_cf)
1690 {
1691 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1693 switch (current_cf) {
1694 case CF_AVERAGE:
1695 default:
1696 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1697 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1698 break;
1699 case CF_SEASONAL:
1700 case CF_DEVSEASONAL:
1701 /* need to update cached seasonal values, so they are consistent
1702 * with the bulk update */
1703 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1704 * CDP_last_deviation are the same. */
1705 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1706 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1707 break;
1708 case CF_HWPREDICT:
1709 case CF_MHWPREDICT:
1710 /* need to update the null_count and last_null_count.
1711 * even do this for non-DNAN pdp_temp because the
1712 * algorithm is not learning from batch updates. */
1713 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1714 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1715 /* fall through */
1716 case CF_DEVPREDICT:
1717 scratch[CDP_primary_val].u_val = DNAN;
1718 scratch[CDP_secondary_val].u_val = DNAN;
1719 break;
1720 case CF_FAILURES:
1721 /* do not count missed bulk values as failures */
1722 scratch[CDP_primary_val].u_val = 0;
1723 scratch[CDP_secondary_val].u_val = 0;
1724 /* need to reset violations buffer.
1725 * could do this more carefully, but for now, just
1726 * assume a bulk update wipes away all violations. */
1727 erase_violations(rrd, cdp_idx, rra_idx);
1728 break;
1729 }
1730 }
1732 static rrd_value_t initialize_average_carry_over(
1733 rrd_value_t pdp_temp_val,
1734 unsigned long elapsed_pdp_st,
1735 unsigned long start_pdp_offset,
1736 unsigned long pdp_cnt)
1737 {
1738 /* initialize carry over value */
1739 if (isnan(pdp_temp_val)) {
1740 return DNAN;
1741 }
1742 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1743 }
1745 /*
1746 * Update or initialize a CDP value based on the consolidation
1747 * function.
1748 *
1749 * Returns the new value.
1750 */
1751 static rrd_value_t calculate_cdp_val(
1752 rrd_value_t cdp_val,
1753 rrd_value_t pdp_temp_val,
1754 unsigned long elapsed_pdp_st,
1755 int current_cf,
1756 #ifdef DEBUG
1757 int i,
1758 int ii
1759 #else
1760 int UNUSED(i),
1761 int UNUSED(ii)
1762 #endif
1763 )
1764 {
1765 if (isnan(cdp_val)) {
1766 if (current_cf == CF_AVERAGE) {
1767 pdp_temp_val *= elapsed_pdp_st;
1768 }
1769 #ifdef DEBUG
1770 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1771 i, ii, pdp_temp_val);
1772 #endif
1773 return pdp_temp_val;
1774 }
1775 if (current_cf == CF_AVERAGE)
1776 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1777 if (current_cf == CF_MINIMUM)
1778 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1779 if (current_cf == CF_MAXIMUM)
1780 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1782 return pdp_temp_val;
1783 }
1785 /*
1786 * For each RRA, update the seasonal values and then call update_aberrant_CF
1787 * for each data source.
1788 *
1789 * Return 0 on success, -1 on error.
1790 */
1791 static int update_aberrant_cdps(
1792 rrd_t *rrd,
1793 rrd_file_t *rrd_file,
1794 unsigned long rra_begin,
1795 unsigned long *rra_current,
1796 unsigned long elapsed_pdp_st,
1797 rrd_value_t *pdp_temp,
1798 rrd_value_t **seasonal_coef)
1799 {
1800 unsigned long rra_idx, ds_idx, j;
1802 /* number of PDP steps since the last update that
1803 * are assigned to the first CDP to be generated
1804 * since the last update. */
1805 unsigned short scratch_idx;
1806 unsigned long rra_start;
1807 enum cf_en current_cf;
1809 /* this loop is only entered if elapsed_pdp_st < 3 */
1810 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1811 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1812 rra_start = rra_begin;
1813 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1814 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1815 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1816 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1817 if (scratch_idx == CDP_primary_val) {
1818 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1819 elapsed_pdp_st + 1, seasonal_coef);
1820 } else {
1821 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1822 elapsed_pdp_st + 2, seasonal_coef);
1823 }
1824 *rra_current = rrd_tell(rrd_file);
1825 }
1826 if (rrd_test_error())
1827 return -1;
1828 /* loop over data soures within each RRA */
1829 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1830 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1831 rra_idx * (rrd->stat_head->ds_cnt) +
1832 ds_idx, rra_idx, ds_idx, scratch_idx,
1833 *seasonal_coef);
1834 }
1835 }
1836 rra_start += rrd->rra_def[rra_idx].row_cnt
1837 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1838 }
1839 }
1840 return 0;
1841 }
1843 /*
1844 * Move sequentially through the file, writing one RRA at a time. Note this
1845 * architecture divorces the computation of CDP with flushing updated RRA
1846 * entries to disk.
1847 *
1848 * Return 0 on success, -1 on error.
1849 */
1850 static int write_to_rras(
1851 rrd_t *rrd,
1852 rrd_file_t *rrd_file,
1853 unsigned long *rra_step_cnt,
1854 unsigned long rra_begin,
1855 unsigned long *rra_current,
1856 time_t current_time,
1857 unsigned long *skip_update,
1858 rrd_info_t ** pcdp_summary)
1859 {
1860 unsigned long rra_idx;
1861 unsigned long rra_start;
1862 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1863 time_t rra_time = 0; /* time of update for a RRA */
1865 /* Ready to write to disk */
1866 rra_start = rra_begin;
1867 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1868 /* skip unless there's something to write */
1869 if (rra_step_cnt[rra_idx]) {
1870 /* write the first row */
1871 #ifdef DEBUG
1872 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1873 #endif
1874 rrd->rra_ptr[rra_idx].cur_row++;
1875 if (rrd->rra_ptr[rra_idx].cur_row >=
1876 rrd->rra_def[rra_idx].row_cnt)
1877 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1878 /* position on the first row */
1879 rra_pos_tmp = rra_start +
1880 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1881 sizeof(rrd_value_t);
1882 if (rra_pos_tmp != *rra_current) {
1883 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1884 rrd_set_error("seek error in rrd");
1885 return -1;
1886 }
1887 *rra_current = rra_pos_tmp;
1888 }
1889 #ifdef DEBUG
1890 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1891 #endif
1892 if (!skip_update[rra_idx]) {
1893 if (*pcdp_summary != NULL) {
1894 rra_time = (current_time - current_time
1895 % (rrd->rra_def[rra_idx].pdp_cnt *
1896 rrd->stat_head->pdp_step))
1897 -
1898 ((rra_step_cnt[rra_idx] -
1899 1) * rrd->rra_def[rra_idx].pdp_cnt *
1900 rrd->stat_head->pdp_step);
1901 }
1902 if (write_RRA_row
1903 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1904 pcdp_summary, rra_time) == -1)
1905 return -1;
1906 }
1908 /* write other rows of the bulk update, if any */
1909 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1910 if (++rrd->rra_ptr[rra_idx].cur_row ==
1911 rrd->rra_def[rra_idx].row_cnt) {
1912 #ifdef DEBUG
1913 fprintf(stderr,
1914 "Wraparound for RRA %s, %lu updates left\n",
1915 rrd->rra_def[rra_idx].cf_nam,
1916 rra_step_cnt[rra_idx] - 1);
1917 #endif
1918 /* wrap */
1919 rrd->rra_ptr[rra_idx].cur_row = 0;
1920 /* seek back to beginning of current rra */
1921 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1922 rrd_set_error("seek error in rrd");
1923 return -1;
1924 }
1925 #ifdef DEBUG
1926 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1927 rrd_file->pos);
1928 #endif
1929 *rra_current = rra_start;
1930 }
1931 if (!skip_update[rra_idx]) {
1932 if (*pcdp_summary != NULL) {
1933 rra_time = (current_time - current_time
1934 % (rrd->rra_def[rra_idx].pdp_cnt *
1935 rrd->stat_head->pdp_step))
1936 -
1937 ((rra_step_cnt[rra_idx] -
1938 2) * rrd->rra_def[rra_idx].pdp_cnt *
1939 rrd->stat_head->pdp_step);
1940 }
1941 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1942 CDP_secondary_val, pcdp_summary,
1943 rra_time) == -1)
1944 return -1;
1945 }
1946 }
1947 }
1948 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1949 sizeof(rrd_value_t);
1950 } /* RRA LOOP */
1952 return 0;
1953 }
1955 /*
1956 * Write out one row of values (one value per DS) to the archive.
1957 *
1958 * Returns 0 on success, -1 on error.
1959 */
1960 static int write_RRA_row(
1961 rrd_file_t *rrd_file,
1962 rrd_t *rrd,
1963 unsigned long rra_idx,
1964 unsigned long *rra_current,
1965 unsigned short CDP_scratch_idx,
1966 rrd_info_t ** pcdp_summary,
1967 time_t rra_time)
1968 {
1969 unsigned long ds_idx, cdp_idx;
1970 rrd_infoval_t iv;
1972 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1973 /* compute the cdp index */
1974 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1975 #ifdef DEBUG
1976 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1977 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1978 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1979 #endif
1980 if (*pcdp_summary != NULL) {
1981 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1982 /* append info to the return hash */
1983 *pcdp_summary = rrd_info_push(*pcdp_summary,
1984 sprintf_alloc
1985 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1986 rrd->rra_def[rra_idx].cf_nam,
1987 rrd->rra_def[rra_idx].pdp_cnt,
1988 rrd->ds_def[ds_idx].ds_nam),
1989 RD_I_VAL, iv);
1990 }
1991 if (rrd_write(rrd_file,
1992 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1993 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1994 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1995 return -1;
1996 }
1997 *rra_current += sizeof(rrd_value_t);
1998 }
1999 return 0;
2000 }
2002 /*
2003 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2004 *
2005 * Returns 0 on success, -1 otherwise
2006 */
2007 static int smooth_all_rras(
2008 rrd_t *rrd,
2009 rrd_file_t *rrd_file,
2010 unsigned long rra_begin)
2011 {
2012 unsigned long rra_start = rra_begin;
2013 unsigned long rra_idx;
2015 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2016 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2017 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2018 #ifdef DEBUG
2019 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2020 #endif
2021 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2022 if (rrd_test_error())
2023 return -1;
2024 }
2025 rra_start += rrd->rra_def[rra_idx].row_cnt
2026 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2027 }
2028 return 0;
2029 }
2031 #ifndef HAVE_MMAP
2032 /*
2033 * Flush changes to disk (unless we're using mmap)
2034 *
2035 * Returns 0 on success, -1 otherwise
2036 */
2037 static int write_changes_to_disk(
2038 rrd_t *rrd,
2039 rrd_file_t *rrd_file,
2040 int version)
2041 {
2042 /* we just need to write back the live header portion now */
2043 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2044 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2045 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2046 SEEK_SET) != 0) {
2047 rrd_set_error("seek rrd for live header writeback");
2048 return -1;
2049 }
2050 if (version >= 3) {
2051 if (rrd_write(rrd_file, rrd->live_head,
2052 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2053 rrd_set_error("rrd_write live_head to rrd");
2054 return -1;
2055 }
2056 } else {
2057 if (rrd_write(rrd_file, rrd->legacy_last_up,
2058 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2059 rrd_set_error("rrd_write live_head to rrd");
2060 return -1;
2061 }
2062 }
2065 if (rrd_write(rrd_file, rrd->pdp_prep,
2066 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2067 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2068 rrd_set_error("rrd_write pdp_prep to rrd");
2069 return -1;
2070 }
2072 if (rrd_write(rrd_file, rrd->cdp_prep,
2073 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2074 rrd->stat_head->ds_cnt)
2075 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2076 rrd->stat_head->ds_cnt)) {
2078 rrd_set_error("rrd_write cdp_prep to rrd");
2079 return -1;
2080 }
2082 if (rrd_write(rrd_file, rrd->rra_ptr,
2083 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2084 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2085 rrd_set_error("rrd_write rra_ptr to rrd");
2086 return -1;
2087 }
2088 return 0;
2089 }
2090 #endif