2 /*****************************************************************************
3 * RRDtool 1.2.99907080300 Copyright by Tobi Oetiker, 1997-2007
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
29 * replacement.
30 */
31 #include <sys/timeb.h>
33 #ifndef __MINGW32__
34 struct timeval {
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
37 };
38 #endif
40 struct __timezone {
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
43 };
45 static int gettimeofday(
46 struct timeval *t,
47 struct __timezone *tz)
48 {
50 struct _timeb current_time;
52 _ftime(¤t_time);
54 t->tv_sec = current_time.time;
55 t->tv_usec = current_time.millitm * 1000;
57 return 0;
58 }
60 #endif
62 #define DEBUG 1
64 /* FUNCTION PROTOTYPES */
66 int rrd_update_r(
67 const char *filename,
68 const char *tmplt,
69 int argc,
70 const char **argv);
71 int _rrd_update(
72 const char *filename,
73 const char *tmplt,
74 int argc,
75 const char **argv,
76 info_t *);
78 static int allocate_data_structures(
79 rrd_t *rrd,
80 char ***updvals,
81 rrd_value_t **pdp_temp,
82 const char *tmplt,
83 long **tmpl_idx,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
90 rrd_t *rrd,
91 const char *tmplt,
92 unsigned long *tmpl_cnt,
93 long *tmpl_idx);
95 static int process_arg(
96 char *step_start,
97 rrd_t *rrd,
98 rrd_file_t *rrd_file,
99 unsigned long rra_begin,
100 unsigned long *rra_current,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
106 char **updvals,
107 long *tmpl_idx,
108 unsigned long tmpl_cnt,
109 info_t **pcdp_summary,
110 int version,
111 unsigned long *skip_update,
112 int *schedule_smooth);
114 static int parse_ds(
115 rrd_t *rrd,
116 char **updvals,
117 long *tmpl_idx,
118 char *input,
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
122 int version);
124 static int get_time_from_reading(
125 rrd_t *rrd,
126 char timesyntax,
127 char **updvals,
128 time_t *current_time,
129 unsigned long *current_time_usec,
130 int version);
132 static int update_pdp_prep(
133 rrd_t *rrd,
134 char **updvals,
135 rrd_value_t *pdp_new,
136 double interval);
138 static int calculate_elapsed_steps(
139 rrd_t *rrd,
140 unsigned long current_time,
141 unsigned long current_time_usec,
142 double interval,
143 double *pre_int,
144 double *post_int,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
148 rrd_t *rrd,
149 double interval,
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
153 rrd_t *rrd,
154 double interval,
155 double pre_int,
156 double post_int,
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
162 rrd_t *rrd,
163 unsigned long ds_idx,
164 double interval,
165 double pre_int,
166 double post_int,
167 long diff_pdp_st,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
172 rrd_t *rrd,
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *rra_current,
182 unsigned long *skip_update,
183 int *schedule_smooth);
185 static int do_schedule_smooth(
186 rrd_t *rrd,
187 unsigned long rra_idx,
188 unsigned long elapsed_pdp_st);
190 static int update_cdp_prep(
191 rrd_t *rrd,
192 unsigned long elapsed_pdp_st,
193 unsigned long start_pdp_offset,
194 unsigned long *rra_step_cnt,
195 int rra_idx,
196 rrd_value_t *pdp_temp,
197 rrd_value_t *last_seasonal_coef,
198 rrd_value_t *seasonal_coef,
199 int current_cf);
201 static void update_cdp(
202 unival *scratch,
203 int current_cf,
204 rrd_value_t pdp_temp_val,
205 unsigned long rra_step_cnt,
206 unsigned long elapsed_pdp_st,
207 unsigned long start_pdp_offset,
208 unsigned long pdp_cnt,
209 rrd_value_t xff,
210 int i,
211 int ii);
213 static void initialize_cdp_val(
214 unival *scratch,
215 int current_cf,
216 rrd_value_t pdp_temp_val,
217 unsigned long elapsed_pdp_st,
218 unsigned long start_pdp_offset,
219 unsigned long pdp_cnt);
221 static void reset_cdp(
222 rrd_t *rrd,
223 unsigned long elapsed_pdp_st,
224 rrd_value_t *pdp_temp,
225 rrd_value_t *last_seasonal_coef,
226 rrd_value_t *seasonal_coef,
227 int rra_idx,
228 int ds_idx,
229 int cdp_idx,
230 enum cf_en current_cf);
232 static rrd_value_t initialize_average_carry_over(
233 rrd_value_t pdp_temp_val,
234 unsigned long elapsed_pdp_st,
235 unsigned long start_pdp_offset,
236 unsigned long pdp_cnt);
238 static rrd_value_t calculate_cdp_val(
239 rrd_value_t cdp_val,
240 rrd_value_t pdp_temp_val,
241 unsigned long elapsed_pdp_st,
242 int current_cf,
243 int i,
244 int ii);
246 static int update_aberrant_cdps(
247 rrd_t *rrd,
248 rrd_file_t *rrd_file,
249 unsigned long rra_begin,
250 unsigned long *rra_current,
251 unsigned long elapsed_pdp_st,
252 rrd_value_t *pdp_temp,
253 rrd_value_t **seasonal_coef);
255 static int write_to_rras(
256 rrd_t *rrd,
257 rrd_file_t *rrd_file,
258 unsigned long *rra_step_cnt,
259 unsigned long rra_begin,
260 unsigned long *rra_current,
261 time_t current_time,
262 unsigned long *skip_update,
263 info_t **pcdp_summary);
265 static int write_RRA_row(
266 rrd_file_t *rrd_file,
267 rrd_t *rrd,
268 unsigned long rra_idx,
269 unsigned long *rra_current,
270 unsigned short CDP_scratch_idx,
271 info_t **pcdp_summary,
272 time_t rra_time);
274 static int smooth_all_rras(
275 rrd_t *rrd,
276 rrd_file_t *rrd_file,
277 unsigned long rra_begin);
279 #ifndef HAVE_MMAP
280 static int write_changes_to_disk(
281 rrd_t *rrd,
282 rrd_file_t *rrd_file,
283 int version);
284 #endif
286 /*
287 * normalize time as returned by gettimeofday. usec part must
288 * be always >= 0
289 */
290 static inline void normalize_time(
291 struct timeval *t)
292 {
293 if (t->tv_usec < 0) {
294 t->tv_sec--;
295 t->tv_usec += 1e6L;
296 }
297 }
299 /*
300 * Sets current_time and current_time_usec based on the current time.
301 * current_time_usec is set to 0 if the version number is 1 or 2.
302 */
303 static inline void initialize_time(
304 time_t *current_time,
305 unsigned long *current_time_usec,
306 int version)
307 {
308 struct timeval tmp_time; /* used for time conversion */
310 gettimeofday(&tmp_time, 0);
311 normalize_time(&tmp_time);
312 *current_time = tmp_time.tv_sec;
313 if (version >= 3) {
314 *current_time_usec = tmp_time.tv_usec;
315 } else {
316 *current_time_usec = 0;
317 }
318 }
320 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
322 info_t *rrd_update_v(
323 int argc,
324 char **argv)
325 {
326 char *tmplt = NULL;
327 info_t *result = NULL;
328 infoval rc;
329 struct option long_options[] = {
330 {"template", required_argument, 0, 't'},
331 {0, 0, 0, 0}
332 };
334 rc.u_int = -1;
335 optind = 0;
336 opterr = 0; /* initialize getopt */
338 while (1) {
339 int option_index = 0;
340 int opt;
342 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
344 if (opt == EOF)
345 break;
347 switch (opt) {
348 case 't':
349 tmplt = optarg;
350 break;
352 case '?':
353 rrd_set_error("unknown option '%s'", argv[optind - 1]);
354 goto end_tag;
355 }
356 }
358 /* need at least 2 arguments: filename, data. */
359 if (argc - optind < 2) {
360 rrd_set_error("Not enough arguments");
361 goto end_tag;
362 }
363 rc.u_int = 0;
364 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
365 rc.u_int = _rrd_update(argv[optind], tmplt,
366 argc - optind - 1,
367 (const char **) (argv + optind + 1), result);
368 result->value.u_int = rc.u_int;
369 end_tag:
370 return result;
371 }
373 int rrd_update(
374 int argc,
375 char **argv)
376 {
377 struct option long_options[] = {
378 {"template", required_argument, 0, 't'},
379 {0, 0, 0, 0}
380 };
381 int option_index = 0;
382 int opt;
383 char *tmplt = NULL;
384 int rc = -1;
386 optind = 0;
387 opterr = 0; /* initialize getopt */
389 while (1) {
390 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
392 if (opt == EOF)
393 break;
395 switch (opt) {
396 case 't':
397 tmplt = strdup(optarg);
398 break;
400 case '?':
401 rrd_set_error("unknown option '%s'", argv[optind - 1]);
402 goto out;
403 }
404 }
406 /* need at least 2 arguments: filename, data. */
407 if (argc - optind < 2) {
408 rrd_set_error("Not enough arguments");
409 goto out;
410 }
412 rc = rrd_update_r(argv[optind], tmplt,
413 argc - optind - 1, (const char **) (argv + optind + 1));
414 out:
415 free(tmplt);
416 return rc;
417 }
419 int rrd_update_r(
420 const char *filename,
421 const char *tmplt,
422 int argc,
423 const char **argv)
424 {
425 return _rrd_update(filename, tmplt, argc, argv, NULL);
426 }
428 int _rrd_update(
429 const char *filename,
430 const char *tmplt,
431 int argc,
432 const char **argv,
433 info_t *pcdp_summary)
434 {
436 int arg_i = 2;
438 unsigned long rra_begin; /* byte pointer to the rra
439 * area in the rrd file. this
440 * pointer never changes value */
441 unsigned long rra_current; /* byte pointer to the current write
442 * spot in the rrd file. */
443 rrd_value_t *pdp_new; /* prepare the incoming data to be added
444 * to the existing entry */
445 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
446 * to the cdp values */
448 long *tmpl_idx; /* index representing the settings
449 * transported by the tmplt index */
450 unsigned long tmpl_cnt = 2; /* time and data */
451 rrd_t rrd;
452 time_t current_time = 0;
453 unsigned long current_time_usec = 0; /* microseconds part of current time */
454 char **updvals;
455 int schedule_smooth = 0;
457 /* number of elapsed PDP steps since last update */
458 unsigned long *rra_step_cnt = NULL;
460 int version; /* rrd version */
461 rrd_file_t *rrd_file;
462 char *arg_copy; /* for processing the argv */
463 unsigned long *skip_update; /* RRAs to advance but not write */
465 /* need at least 1 arguments: data. */
466 if (argc < 1) {
467 rrd_set_error("Not enough arguments");
468 goto err_out;
469 }
471 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
472 goto err_free;
473 }
474 /* We are now at the beginning of the rra's */
475 rra_current = rra_begin = rrd_file->header_len;
477 version = atoi(rrd.stat_head->version);
479 initialize_time(¤t_time, ¤t_time_usec, version);
481 /* get exclusive lock to whole file.
482 * lock gets removed when we close the file.
483 */
484 if (LockRRD(rrd_file->fd) != 0) {
485 rrd_set_error("could not lock RRD");
486 goto err_close;
487 }
489 if (allocate_data_structures(&rrd, &updvals,
490 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
491 &rra_step_cnt, &skip_update,
492 &pdp_new) == -1) {
493 goto err_close;
494 }
496 /* loop through the arguments. */
497 for (arg_i = 0; arg_i < argc; arg_i++) {
498 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
499 rrd_set_error("failed duplication argv entry");
500 break;
501 }
502 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
503 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
504 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
505 &pcdp_summary, version, skip_update,
506 &schedule_smooth) == -1) {
507 free(arg_copy);
508 break;
509 }
510 free(arg_copy);
511 }
513 free(rra_step_cnt);
515 /* if we got here and if there is an error and if the file has not been
516 * written to, then close things up and return. */
517 if (rrd_test_error()) {
518 goto err_free_structures;
519 }
520 #ifndef HAVE_MMAP
521 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
522 goto err_free_structures;
523 }
524 #endif
526 /* calling the smoothing code here guarantees at most one smoothing
527 * operation per rrd_update call. Unfortunately, it is possible with bulk
528 * updates, or a long-delayed update for smoothing to occur off-schedule.
529 * This really isn't critical except during the burn-in cycles. */
530 if (schedule_smooth) {
531 smooth_all_rras(&rrd, rrd_file, rra_begin);
532 }
534 /* rrd_dontneed(rrd_file,&rrd); */
535 rrd_free(&rrd);
536 rrd_close(rrd_file);
538 free(pdp_new);
539 free(tmpl_idx);
540 free(pdp_temp);
541 free(skip_update);
542 free(updvals);
543 return 0;
545 err_free_structures:
546 free(pdp_new);
547 free(tmpl_idx);
548 free(pdp_temp);
549 free(skip_update);
550 free(updvals);
551 err_close:
552 rrd_close(rrd_file);
553 err_free:
554 rrd_free(&rrd);
555 err_out:
556 return -1;
557 }
559 /*
560 * get exclusive lock to whole file.
561 * lock gets removed when we close the file
562 *
563 * returns 0 on success
564 */
565 int LockRRD(
566 int in_file)
567 {
568 int rcstat;
570 {
571 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
572 struct _stat st;
574 if (_fstat(in_file, &st) == 0) {
575 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
576 } else {
577 rcstat = -1;
578 }
579 #else
580 struct flock lock;
582 lock.l_type = F_WRLCK; /* exclusive write lock */
583 lock.l_len = 0; /* whole file */
584 lock.l_start = 0; /* start of file */
585 lock.l_whence = SEEK_SET; /* end of file */
587 rcstat = fcntl(in_file, F_SETLK, &lock);
588 #endif
589 }
591 return (rcstat);
592 }
594 /*
595 * Allocate some important arrays used, and initialize the template.
596 *
597 * When it returns, either all of the structures are allocated
598 * or none of them are.
599 *
600 * Returns 0 on success, -1 on error.
601 */
602 static int allocate_data_structures(
603 rrd_t *rrd,
604 char ***updvals,
605 rrd_value_t **pdp_temp,
606 const char *tmplt,
607 long **tmpl_idx,
608 unsigned long *tmpl_cnt,
609 unsigned long **rra_step_cnt,
610 unsigned long **skip_update,
611 rrd_value_t **pdp_new)
612 {
613 unsigned i, ii;
614 if ((*updvals = (char **) malloc(sizeof(char *)
615 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
616 rrd_set_error("allocating updvals pointer array.");
617 return -1;
618 }
619 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
620 * rrd->stat_head->ds_cnt)) ==
621 NULL) {
622 rrd_set_error("allocating pdp_temp.");
623 goto err_free_updvals;
624 }
625 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
626 *
627 rrd->stat_head->rra_cnt)) ==
628 NULL) {
629 rrd_set_error("allocating skip_update.");
630 goto err_free_pdp_temp;
631 }
632 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
633 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
634 rrd_set_error("allocating tmpl_idx.");
635 goto err_free_skip_update;
636 }
637 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
638 *
639 (rrd->stat_head->
640 rra_cnt))) == NULL) {
641 rrd_set_error("allocating rra_step_cnt.");
642 goto err_free_tmpl_idx;
643 }
645 /* initialize tmplt redirector */
646 /* default config example (assume DS 1 is a CDEF DS)
647 tmpl_idx[0] -> 0; (time)
648 tmpl_idx[1] -> 1; (DS 0)
649 tmpl_idx[2] -> 3; (DS 2)
650 tmpl_idx[3] -> 4; (DS 3) */
651 (*tmpl_idx)[0] = 0; /* time */
652 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
653 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
654 (*tmpl_idx)[ii++] = i;
655 }
656 *tmpl_cnt = ii;
658 if (tmplt != NULL) {
659 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
660 goto err_free_rra_step_cnt;
661 }
662 }
664 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
665 * rrd->stat_head->ds_cnt)) == NULL) {
666 rrd_set_error("allocating pdp_new.");
667 goto err_free_rra_step_cnt;
668 }
670 return 0;
672 err_free_rra_step_cnt:
673 free(*rra_step_cnt);
674 err_free_tmpl_idx:
675 free(*tmpl_idx);
676 err_free_skip_update:
677 free(*skip_update);
678 err_free_pdp_temp:
679 free(*pdp_temp);
680 err_free_updvals:
681 free(*updvals);
682 return -1;
683 }
685 /*
686 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
687 *
688 * Returns 0 on success.
689 */
690 static int parse_template(
691 rrd_t *rrd,
692 const char *tmplt,
693 unsigned long *tmpl_cnt,
694 long *tmpl_idx)
695 {
696 char *dsname, *tmplt_copy;
697 unsigned int tmpl_len, i;
698 int ret = 0;
700 *tmpl_cnt = 1; /* the first entry is the time */
702 /* we should work on a writeable copy here */
703 if ((tmplt_copy = strdup(tmplt)) == NULL) {
704 rrd_set_error("error copying tmplt '%s'", tmplt);
705 ret = -1;
706 goto out;
707 }
709 dsname = tmplt_copy;
710 tmpl_len = strlen(tmplt_copy);
711 for (i = 0; i <= tmpl_len; i++) {
712 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
713 tmplt_copy[i] = '\0';
714 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
715 rrd_set_error("tmplt contains more DS definitions than RRD");
716 ret = -1;
717 goto out_free_tmpl_copy;
718 }
719 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
720 rrd_set_error("unknown DS name '%s'", dsname);
721 ret = -1;
722 goto out_free_tmpl_copy;
723 }
724 /* go to the next entry on the tmplt_copy */
725 if (i < tmpl_len)
726 dsname = &tmplt_copy[i + 1];
727 }
728 }
729 out_free_tmpl_copy:
730 free(tmplt_copy);
731 out:
732 return ret;
733 }
735 /*
736 * Parse an update string, updates the primary data points (PDPs)
737 * and consolidated data points (CDPs), and writes changes to the RRAs.
738 *
739 * Returns 0 on success, -1 on error.
740 */
741 static int process_arg(
742 char *step_start,
743 rrd_t *rrd,
744 rrd_file_t *rrd_file,
745 unsigned long rra_begin,
746 unsigned long *rra_current,
747 time_t *current_time,
748 unsigned long *current_time_usec,
749 rrd_value_t *pdp_temp,
750 rrd_value_t *pdp_new,
751 unsigned long *rra_step_cnt,
752 char **updvals,
753 long *tmpl_idx,
754 unsigned long tmpl_cnt,
755 info_t **pcdp_summary,
756 int version,
757 unsigned long *skip_update,
758 int *schedule_smooth)
759 {
760 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
762 /* a vector of future Holt-Winters seasonal coefs */
763 unsigned long elapsed_pdp_st;
765 double interval, pre_int, post_int; /* interval between this and
766 * the last run */
767 unsigned long proc_pdp_cnt;
768 unsigned long rra_start;
770 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
771 current_time, current_time_usec, version) == -1) {
772 return -1;
773 }
774 /* seek to the beginning of the rra's */
775 if (*rra_current != rra_begin) {
776 #ifndef HAVE_MMAP
777 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
778 rrd_set_error("seek error in rrd");
779 return -1;
780 }
781 #endif
782 *rra_current = rra_begin;
783 }
784 rra_start = rra_begin;
786 interval = (double) (*current_time - rrd->live_head->last_up)
787 + (double) ((long) *current_time_usec -
788 (long) rrd->live_head->last_up_usec) / 1e6f;
790 /* process the data sources and update the pdp_prep
791 * area accordingly */
792 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
793 return -1;
794 }
796 elapsed_pdp_st = calculate_elapsed_steps(rrd,
797 *current_time,
798 *current_time_usec, interval,
799 &pre_int, &post_int,
800 &proc_pdp_cnt);
802 /* has a pdp_st moment occurred since the last run ? */
803 if (elapsed_pdp_st == 0) {
804 /* no we have not passed a pdp_st moment. therefore update is simple */
805 simple_update(rrd, interval, pdp_new);
806 } else {
807 /* an pdp_st has occurred. */
808 if (process_all_pdp_st(rrd, interval,
809 pre_int, post_int,
810 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
811 return -1;
812 }
813 if (update_all_cdp_prep(rrd, rra_step_cnt,
814 rra_begin, rrd_file,
815 elapsed_pdp_st,
816 proc_pdp_cnt,
817 &last_seasonal_coef,
818 &seasonal_coef,
819 pdp_temp, rra_current,
820 skip_update, schedule_smooth) == -1) {
821 goto err_free_coefficients;
822 }
823 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
824 elapsed_pdp_st, pdp_temp,
825 &seasonal_coef) == -1) {
826 goto err_free_coefficients;
827 }
828 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
829 rra_current, *current_time, skip_update,
830 pcdp_summary) == -1) {
831 goto err_free_coefficients;
832 }
833 } /* endif a pdp_st has occurred */
834 rrd->live_head->last_up = *current_time;
835 rrd->live_head->last_up_usec = *current_time_usec;
837 free(seasonal_coef);
838 free(last_seasonal_coef);
839 return 0;
841 err_free_coefficients:
842 free(seasonal_coef);
843 free(last_seasonal_coef);
844 return -1;
845 }
847 /*
848 * Parse a DS string (time + colon-separated values), storing the
849 * results in current_time, current_time_usec, and updvals.
850 *
851 * Returns 0 on success, -1 on error.
852 */
853 static int parse_ds(
854 rrd_t *rrd,
855 char **updvals,
856 long *tmpl_idx,
857 char *input,
858 unsigned long tmpl_cnt,
859 time_t *current_time,
860 unsigned long *current_time_usec,
861 int version)
862 {
863 char *p;
864 unsigned long i;
865 char timesyntax;
867 updvals[0] = input;
868 /* initialize all ds input to unknown except the first one
869 which has always got to be set */
870 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
871 updvals[i] = "U";
873 /* separate all ds elements; first must be examined separately
874 due to alternate time syntax */
875 if ((p = strchr(input, '@')) != NULL) {
876 timesyntax = '@';
877 } else if ((p = strchr(input, ':')) != NULL) {
878 timesyntax = ':';
879 } else {
880 rrd_set_error("expected timestamp not found in data source from %s",
881 input);
882 return -1;
883 }
884 *p = '\0';
885 i = 1;
886 updvals[tmpl_idx[i++]] = p + 1;
887 while (*(++p)) {
888 if (*p == ':') {
889 *p = '\0';
890 if (i < tmpl_cnt) {
891 updvals[tmpl_idx[i++]] = p + 1;
892 }
893 }
894 }
896 if (i != tmpl_cnt) {
897 rrd_set_error("expected %lu data source readings (got %lu) from %s",
898 tmpl_cnt - 1, i, input);
899 return -1;
900 }
902 if (get_time_from_reading(rrd, timesyntax, updvals,
903 current_time, current_time_usec,
904 version) == -1) {
905 return -1;
906 }
907 return 0;
908 }
910 /*
911 * Parse the time in a DS string, store it in current_time and
912 * current_time_usec and verify that it's later than the last
913 * update for this DS.
914 *
915 * Returns 0 on success, -1 on error.
916 */
917 static int get_time_from_reading(
918 rrd_t *rrd,
919 char timesyntax,
920 char **updvals,
921 time_t *current_time,
922 unsigned long *current_time_usec,
923 int version)
924 {
925 double tmp;
926 char *parsetime_error = NULL;
927 char *old_locale;
928 struct rrd_time_value ds_tv;
929 struct timeval tmp_time; /* used for time conversion */
931 /* get the time from the reading ... handle N */
932 if (timesyntax == '@') { /* at-style */
933 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
934 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
935 return -1;
936 }
937 if (ds_tv.type == RELATIVE_TO_END_TIME ||
938 ds_tv.type == RELATIVE_TO_START_TIME) {
939 rrd_set_error("specifying time relative to the 'start' "
940 "or 'end' makes no sense here: %s", updvals[0]);
941 return -1;
942 }
943 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
944 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
945 } else if (strcmp(updvals[0], "N") == 0) {
946 gettimeofday(&tmp_time, 0);
947 normalize_time(&tmp_time);
948 *current_time = tmp_time.tv_sec;
949 *current_time_usec = tmp_time.tv_usec;
950 } else {
951 old_locale = setlocale(LC_NUMERIC, "C");
952 tmp = strtod(updvals[0], 0);
953 setlocale(LC_NUMERIC, old_locale);
954 *current_time = floor(tmp);
955 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
956 }
957 /* dont do any correction for old version RRDs */
958 if (version < 3)
959 *current_time_usec = 0;
961 if (*current_time < rrd->live_head->last_up ||
962 (*current_time == rrd->live_head->last_up &&
963 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
964 rrd_set_error("illegal attempt to update using time %ld when "
965 "last update time is %ld (minimum one second step)",
966 *current_time, rrd->live_head->last_up);
967 return -1;
968 }
969 return 0;
970 }
972 /*
973 * Update pdp_new by interpreting the updvals according to the DS type
974 * (COUNTER, GAUGE, etc.).
975 *
976 * Returns 0 on success, -1 on error.
977 */
978 static int update_pdp_prep(
979 rrd_t *rrd,
980 char **updvals,
981 rrd_value_t *pdp_new,
982 double interval)
983 {
984 unsigned long ds_idx;
985 int ii;
986 char *endptr; /* used in the conversion */
987 double rate;
988 char *old_locale;
989 enum dst_en dst_idx;
991 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
992 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
994 /* make sure we do not build diffs with old last_ds values */
995 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
996 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
997 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
998 }
1000 /* NOTE: DST_CDEF should never enter this if block, because
1001 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1002 * accidently specified a value for the DST_CDEF. To handle this case,
1003 * an extra check is required. */
1005 if ((updvals[ds_idx + 1][0] != 'U') &&
1006 (dst_idx != DST_CDEF) &&
1007 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1008 rate = DNAN;
1010 /* pdp_new contains rate * time ... eg the bytes transferred during
1011 * the interval. Doing it this way saves a lot of math operations
1012 */
1013 switch (dst_idx) {
1014 case DST_COUNTER:
1015 case DST_DERIVE:
1016 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1017 if ((updvals[ds_idx + 1][ii] < '0'
1018 || updvals[ds_idx + 1][ii] > '9')
1019 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1020 rrd_set_error("not a simple integer: '%s'",
1021 updvals[ds_idx + 1]);
1022 return -1;
1023 }
1024 }
1025 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1026 pdp_new[ds_idx] =
1027 rrd_diff(updvals[ds_idx + 1],
1028 rrd->pdp_prep[ds_idx].last_ds);
1029 if (dst_idx == DST_COUNTER) {
1030 /* simple overflow catcher. This will fail
1031 * terribly for non 32 or 64 bit counters
1032 * ... are there any others in SNMP land?
1033 */
1034 if (pdp_new[ds_idx] < (double) 0.0)
1035 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1036 if (pdp_new[ds_idx] < (double) 0.0)
1037 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1038 }
1039 rate = pdp_new[ds_idx] / interval;
1040 } else {
1041 pdp_new[ds_idx] = DNAN;
1042 }
1043 break;
1044 case DST_ABSOLUTE:
1045 old_locale = setlocale(LC_NUMERIC, "C");
1046 errno = 0;
1047 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1048 setlocale(LC_NUMERIC, old_locale);
1049 if (errno > 0) {
1050 rrd_set_error("converting '%s' to float: %s",
1051 updvals[ds_idx + 1], rrd_strerror(errno));
1052 return -1;
1053 };
1054 if (endptr[0] != '\0') {
1055 rrd_set_error
1056 ("conversion of '%s' to float not complete: tail '%s'",
1057 updvals[ds_idx + 1], endptr);
1058 return -1;
1059 }
1060 rate = pdp_new[ds_idx] / interval;
1061 break;
1062 case DST_GAUGE:
1063 errno = 0;
1064 old_locale = setlocale(LC_NUMERIC, "C");
1065 pdp_new[ds_idx] =
1066 strtod(updvals[ds_idx + 1], &endptr) * interval;
1067 setlocale(LC_NUMERIC, old_locale);
1068 if (errno) {
1069 rrd_set_error("converting '%s' to float: %s",
1070 updvals[ds_idx + 1], rrd_strerror(errno));
1071 return -1;
1072 };
1073 if (endptr[0] != '\0') {
1074 rrd_set_error
1075 ("conversion of '%s' to float not complete: tail '%s'",
1076 updvals[ds_idx + 1], endptr);
1077 return -1;
1078 }
1079 rate = pdp_new[ds_idx] / interval;
1080 break;
1081 default:
1082 rrd_set_error("rrd contains unknown DS type : '%s'",
1083 rrd->ds_def[ds_idx].dst);
1084 return -1;
1085 }
1086 /* break out of this for loop if the error string is set */
1087 if (rrd_test_error()) {
1088 return -1;
1089 }
1090 /* make sure pdp_temp is neither too large or too small
1091 * if any of these occur it becomes unknown ...
1092 * sorry folks ... */
1093 if (!isnan(rate) &&
1094 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1095 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1096 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1097 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1098 pdp_new[ds_idx] = DNAN;
1099 }
1100 } else {
1101 /* no news is news all the same */
1102 pdp_new[ds_idx] = DNAN;
1103 }
1106 /* make a copy of the command line argument for the next run */
1107 #ifdef DEBUG
1108 fprintf(stderr, "prep ds[%lu]\t"
1109 "last_arg '%s'\t"
1110 "this_arg '%s'\t"
1111 "pdp_new %10.2f\n",
1112 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1113 pdp_new[ds_idx]);
1114 #endif
1115 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1116 LAST_DS_LEN - 1);
1117 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1118 }
1119 return 0;
1120 }
1122 /*
1123 * How many PDP steps have elapsed since the last update? Returns the answer,
1124 * and stores the time between the last update and the last PDP in pre_time,
1125 * and the time between the last PDP and the current time in post_int.
1126 */
1127 static int calculate_elapsed_steps(
1128 rrd_t *rrd,
1129 unsigned long current_time,
1130 unsigned long current_time_usec,
1131 double interval,
1132 double *pre_int,
1133 double *post_int,
1134 unsigned long *proc_pdp_cnt)
1135 {
1136 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1137 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1138 * time */
1139 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1140 * when it was last updated */
1141 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1143 /* when was the current pdp started */
1144 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1145 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1147 /* when did the last pdp_st occur */
1148 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1149 occu_pdp_st = current_time - occu_pdp_age;
1151 if (occu_pdp_st > proc_pdp_st) {
1152 /* OK we passed the pdp_st moment */
1153 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1154 * occurred before the latest
1155 * pdp_st moment*/
1156 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1157 *post_int = occu_pdp_age; /* how much after it */
1158 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1159 } else {
1160 *pre_int = interval;
1161 *post_int = 0;
1162 }
1164 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1166 #ifdef DEBUG
1167 printf("proc_pdp_age %lu\t"
1168 "proc_pdp_st %lu\t"
1169 "occu_pfp_age %lu\t"
1170 "occu_pdp_st %lu\t"
1171 "int %lf\t"
1172 "pre_int %lf\t"
1173 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1174 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1175 #endif
1177 /* compute the number of elapsed pdp_st moments */
1178 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1179 }
1181 /*
1182 * Increment the PDP values by the values in pdp_new, or else initialize them.
1183 */
1184 static void simple_update(
1185 rrd_t *rrd,
1186 double interval,
1187 rrd_value_t *pdp_new)
1188 {
1189 int i;
1191 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1192 if (isnan(pdp_new[i])) {
1193 /* this is not really accurate if we use subsecond data arrival time
1194 should have thought of it when going subsecond resolution ...
1195 sorry next format change we will have it! */
1196 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1197 floor(interval);
1198 } else {
1199 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1200 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1201 } else {
1202 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1203 }
1204 }
1205 #ifdef DEBUG
1206 fprintf(stderr,
1207 "NO PDP ds[%i]\t"
1208 "value %10.2f\t"
1209 "unkn_sec %5lu\n",
1210 i,
1211 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1212 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1213 #endif
1214 }
1215 }
1217 /*
1218 * Call process_pdp_st for each DS.
1219 *
1220 * Returns 0 on success, -1 on error.
1221 */
1222 static int process_all_pdp_st(
1223 rrd_t *rrd,
1224 double interval,
1225 double pre_int,
1226 double post_int,
1227 unsigned long elapsed_pdp_st,
1228 rrd_value_t *pdp_new,
1229 rrd_value_t *pdp_temp)
1230 {
1231 unsigned long ds_idx;
1233 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1234 rate*seconds which occurred up to the last run.
1235 pdp_new[] contains rate*seconds from the latest run.
1236 pdp_temp[] will contain the rate for cdp */
1238 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1239 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1240 elapsed_pdp_st * rrd->stat_head->pdp_step,
1241 pdp_new, pdp_temp) == -1) {
1242 return -1;
1243 }
1244 #ifdef DEBUG
1245 fprintf(stderr, "PDP UPD ds[%lu]\t"
1246 "elapsed_pdp_st %lu\t"
1247 "pdp_temp %10.2f\t"
1248 "new_prep %10.2f\t"
1249 "new_unkn_sec %5lu\n",
1250 ds_idx,
1251 elapsed_pdp_st,
1252 pdp_temp[ds_idx],
1253 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1254 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1255 #endif
1256 }
1257 return 0;
1258 }
1260 /*
1261 * Process an update that occurs after one of the PDP moments.
1262 * Increments the PDP value, sets NAN if time greater than the
1263 * heartbeats have elapsed, processes CDEFs.
1264 *
1265 * Returns 0 on success, -1 on error.
1266 */
1267 static int process_pdp_st(
1268 rrd_t *rrd,
1269 unsigned long ds_idx,
1270 double interval,
1271 double pre_int,
1272 double post_int,
1273 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1274 rrd_value_t *pdp_new,
1275 rrd_value_t *pdp_temp)
1276 {
1277 int i;
1279 /* update pdp_prep to the current pdp_st. */
1280 double pre_unknown = 0.0;
1281 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1282 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1284 rpnstack_t rpnstack; /* used for COMPUTE DS */
1286 rpnstack_init(&rpnstack);
1289 if (isnan(pdp_new[ds_idx])) {
1290 /* a final bit of unknown to be added before calculation
1291 we use a temporary variable for this so that we
1292 don't have to turn integer lines before using the value */
1293 pre_unknown = pre_int;
1294 } else {
1295 if (isnan(scratch[PDP_val].u_val)) {
1296 scratch[PDP_val].u_val = 0;
1297 }
1298 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1299 }
1301 /* if too much of the pdp_prep is unknown we dump it */
1302 /* if the interval is larger thatn mrhb we get NAN */
1303 if ((interval > mrhb) ||
1304 (rrd->stat_head->pdp_step/2.0 < (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1305 pdp_temp[ds_idx] = DNAN;
1306 } else {
1307 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1308 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1309 pre_unknown);
1310 }
1312 /* process CDEF data sources; remember each CDEF DS can
1313 * only reference other DS with a lower index number */
1314 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1315 rpnp_t *rpnp;
1317 rpnp =
1318 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1319 /* substitute data values for OP_VARIABLE nodes */
1320 for (i = 0; rpnp[i].op != OP_END; i++) {
1321 if (rpnp[i].op == OP_VARIABLE) {
1322 rpnp[i].op = OP_NUMBER;
1323 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1324 }
1325 }
1326 /* run the rpn calculator */
1327 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1328 free(rpnp);
1329 rpnstack_free(&rpnstack);
1330 return -1;
1331 }
1332 }
1334 /* make pdp_prep ready for the next run */
1335 if (isnan(pdp_new[ds_idx])) {
1336 /* this is not realy accurate if we use subsecond data arival time
1337 should have thought of it when going subsecond resolution ...
1338 sorry next format change we will have it! */
1339 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1340 scratch[PDP_val].u_val = DNAN;
1341 } else {
1342 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1343 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1344 }
1345 rpnstack_free(&rpnstack);
1346 return 0;
1347 }
1349 /*
1350 * Iterate over all the RRAs for a given DS and:
1351 * 1. Decide whether to schedule a smooth later
1352 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1353 * 3. Update the CDP
1354 *
1355 * Returns 0 on success, -1 on error
1356 */
1357 static int update_all_cdp_prep(
1358 rrd_t *rrd,
1359 unsigned long *rra_step_cnt,
1360 unsigned long rra_begin,
1361 rrd_file_t *rrd_file,
1362 unsigned long elapsed_pdp_st,
1363 unsigned long proc_pdp_cnt,
1364 rrd_value_t **last_seasonal_coef,
1365 rrd_value_t **seasonal_coef,
1366 rrd_value_t *pdp_temp,
1367 unsigned long *rra_current,
1368 unsigned long *skip_update,
1369 int *schedule_smooth)
1370 {
1371 unsigned long rra_idx;
1373 /* index into the CDP scratch array */
1374 enum cf_en current_cf;
1375 unsigned long rra_start;
1377 /* number of rows to be updated in an RRA for a data value. */
1378 unsigned long start_pdp_offset;
1380 rra_start = rra_begin;
1381 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1382 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1383 start_pdp_offset =
1384 rrd->rra_def[rra_idx].pdp_cnt -
1385 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1386 skip_update[rra_idx] = 0;
1387 if (start_pdp_offset <= elapsed_pdp_st) {
1388 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1389 rrd->rra_def[rra_idx].pdp_cnt + 1;
1390 } else {
1391 rra_step_cnt[rra_idx] = 0;
1392 }
1394 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1395 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1396 * so that they will be correct for the next observed value; note that for
1397 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1398 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1399 if (rra_step_cnt[rra_idx] > 1) {
1400 skip_update[rra_idx] = 1;
1401 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1402 elapsed_pdp_st, last_seasonal_coef);
1403 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1404 elapsed_pdp_st + 1, seasonal_coef);
1405 }
1406 /* periodically run a smoother for seasonal effects */
1407 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1408 #ifdef DEBUG
1409 fprintf(stderr,
1410 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1411 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1412 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1413 u_cnt);
1414 #endif
1415 *schedule_smooth = 1;
1416 }
1417 *rra_current = rrd_tell(rrd_file);
1418 }
1419 if (rrd_test_error())
1420 return -1;
1422 if (update_cdp_prep
1423 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1424 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1425 current_cf) == -1) {
1426 return -1;
1427 }
1428 rra_start +=
1429 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1430 sizeof(rrd_value_t);
1431 }
1432 return 0;
1433 }
1435 /*
1436 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1437 */
1438 static int do_schedule_smooth(
1439 rrd_t *rrd,
1440 unsigned long rra_idx,
1441 unsigned long elapsed_pdp_st)
1442 {
1443 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1444 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1445 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1446 unsigned long seasonal_smooth_idx =
1447 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1448 unsigned long *init_seasonal =
1449 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1451 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1452 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1453 * really an RRA level, not a data source within RRA level parameter, but
1454 * the rra_def is read only for rrd_update (not flushed to disk). */
1455 if (*init_seasonal > BURNIN_CYCLES) {
1456 /* someone has no doubt invented a trick to deal with this wrap around,
1457 * but at least this code is clear. */
1458 if (seasonal_smooth_idx > cur_row) {
1459 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1460 * between PDP and CDP */
1461 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1462 }
1463 /* can't rely on negative numbers because we are working with
1464 * unsigned values */
1465 return (cur_row + elapsed_pdp_st >= row_cnt
1466 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1467 }
1468 /* mark off one of the burn-in cycles */
1469 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1470 }
1472 /*
1473 * For a given RRA, iterate over the data sources and call the appropriate
1474 * consolidation function.
1475 *
1476 * Returns 0 on success, -1 on error.
1477 */
1478 static int update_cdp_prep(
1479 rrd_t *rrd,
1480 unsigned long elapsed_pdp_st,
1481 unsigned long start_pdp_offset,
1482 unsigned long *rra_step_cnt,
1483 int rra_idx,
1484 rrd_value_t *pdp_temp,
1485 rrd_value_t *last_seasonal_coef,
1486 rrd_value_t *seasonal_coef,
1487 int current_cf)
1488 {
1489 unsigned long ds_idx, cdp_idx;
1491 /* update CDP_PREP areas */
1492 /* loop over data soures within each RRA */
1493 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1495 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1497 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1498 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1499 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1500 elapsed_pdp_st, start_pdp_offset,
1501 rrd->rra_def[rra_idx].pdp_cnt,
1502 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1503 rra_idx, ds_idx);
1504 } else {
1505 /* Nothing to consolidate if there's one PDP per CDP. However, if
1506 * we've missed some PDPs, let's update null counters etc. */
1507 if (elapsed_pdp_st > 2) {
1508 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1509 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1510 current_cf);
1511 }
1512 }
1514 if (rrd_test_error())
1515 return -1;
1516 } /* endif data sources loop */
1517 return 0;
1518 }
1520 /*
1521 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1522 * primary value, secondary value, and # of unknowns.
1523 */
1524 static void update_cdp(
1525 unival *scratch,
1526 int current_cf,
1527 rrd_value_t pdp_temp_val,
1528 unsigned long rra_step_cnt,
1529 unsigned long elapsed_pdp_st,
1530 unsigned long start_pdp_offset,
1531 unsigned long pdp_cnt,
1532 rrd_value_t xff,
1533 int i,
1534 int ii)
1535 {
1536 /* shorthand variables */
1537 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1538 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1539 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1540 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1542 if (rra_step_cnt) {
1543 /* If we are in this block, as least 1 CDP value will be written to
1544 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1545 * to be written, then the "fill in" value is the CDP_secondary_val
1546 * entry. */
1547 if (isnan(pdp_temp_val)) {
1548 *cdp_unkn_pdp_cnt += start_pdp_offset;
1549 *cdp_secondary_val = DNAN;
1550 } else {
1551 /* CDP_secondary value is the RRA "fill in" value for intermediary
1552 * CDP data entries. No matter the CF, the value is the same because
1553 * the average, max, min, and last of a list of identical values is
1554 * the same, namely, the value itself. */
1555 *cdp_secondary_val = pdp_temp_val;
1556 }
1558 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1559 *cdp_primary_val = DNAN;
1560 if (current_cf == CF_AVERAGE) {
1561 *cdp_val =
1562 initialize_average_carry_over(pdp_temp_val,
1563 elapsed_pdp_st,
1564 start_pdp_offset, pdp_cnt);
1565 } else {
1566 *cdp_val = pdp_temp_val;
1567 }
1568 } else {
1569 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1570 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1571 } /* endif meets xff value requirement for a valid value */
1572 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1573 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1574 if (isnan(pdp_temp_val))
1575 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1576 else
1577 *cdp_unkn_pdp_cnt = 0;
1578 } else { /* rra_step_cnt[i] == 0 */
1580 #ifdef DEBUG
1581 if (isnan(*cdp_val)) {
1582 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1583 i, ii);
1584 } else {
1585 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1586 i, ii, *cdp_val);
1587 }
1588 #endif
1589 if (isnan(pdp_temp_val)) {
1590 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1591 } else {
1592 *cdp_val =
1593 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1594 current_cf, i, ii);
1595 }
1596 }
1597 }
1599 /*
1600 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1601 * on the type of consolidation function.
1602 */
1603 static void initialize_cdp_val(
1604 unival *scratch,
1605 int current_cf,
1606 rrd_value_t pdp_temp_val,
1607 unsigned long elapsed_pdp_st,
1608 unsigned long start_pdp_offset,
1609 unsigned long pdp_cnt)
1610 {
1611 rrd_value_t cum_val, cur_val;
1613 switch (current_cf) {
1614 case CF_AVERAGE:
1615 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1616 cur_val = IFDNAN(pdp_temp_val, 0.0);
1617 scratch[CDP_primary_val].u_val =
1618 (cum_val + cur_val * start_pdp_offset) /
1619 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1620 scratch[CDP_val].u_val =
1621 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1622 start_pdp_offset, pdp_cnt);
1623 break;
1624 case CF_MAXIMUM:
1625 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1626 cur_val = IFDNAN(pdp_temp_val, -DINF);
1627 #if 0
1628 #ifdef DEBUG
1629 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1630 fprintf(stderr,
1631 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1632 i, ii);
1633 exit(-1);
1634 }
1635 #endif
1636 #endif
1637 if (cur_val > cum_val)
1638 scratch[CDP_primary_val].u_val = cur_val;
1639 else
1640 scratch[CDP_primary_val].u_val = cum_val;
1641 /* initialize carry over value */
1642 scratch[CDP_val].u_val = pdp_temp_val;
1643 break;
1644 case CF_MINIMUM:
1645 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1646 cur_val = IFDNAN(pdp_temp_val, DINF);
1647 #if 0
1648 #ifdef DEBUG
1649 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1650 fprintf(stderr,
1651 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1652 ii);
1653 exit(-1);
1654 }
1655 #endif
1656 #endif
1657 if (cur_val < cum_val)
1658 scratch[CDP_primary_val].u_val = cur_val;
1659 else
1660 scratch[CDP_primary_val].u_val = cum_val;
1661 /* initialize carry over value */
1662 scratch[CDP_val].u_val = pdp_temp_val;
1663 break;
1664 case CF_LAST:
1665 default:
1666 scratch[CDP_primary_val].u_val = pdp_temp_val;
1667 /* initialize carry over value */
1668 scratch[CDP_val].u_val = pdp_temp_val;
1669 break;
1670 }
1671 }
1673 /*
1674 * Update the consolidation function for Holt-Winters functions as
1675 * well as other functions that don't actually consolidate multiple
1676 * PDPs.
1677 */
1678 static void reset_cdp(
1679 rrd_t *rrd,
1680 unsigned long elapsed_pdp_st,
1681 rrd_value_t *pdp_temp,
1682 rrd_value_t *last_seasonal_coef,
1683 rrd_value_t *seasonal_coef,
1684 int rra_idx,
1685 int ds_idx,
1686 int cdp_idx,
1687 enum cf_en current_cf)
1688 {
1689 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1691 switch (current_cf) {
1692 case CF_AVERAGE:
1693 default:
1694 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1695 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1696 break;
1697 case CF_SEASONAL:
1698 case CF_DEVSEASONAL:
1699 /* need to update cached seasonal values, so they are consistent
1700 * with the bulk update */
1701 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1702 * CDP_last_deviation are the same. */
1703 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1704 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1705 break;
1706 case CF_HWPREDICT:
1707 case CF_MHWPREDICT:
1708 /* need to update the null_count and last_null_count.
1709 * even do this for non-DNAN pdp_temp because the
1710 * algorithm is not learning from batch updates. */
1711 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1712 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1713 /* fall through */
1714 case CF_DEVPREDICT:
1715 scratch[CDP_primary_val].u_val = DNAN;
1716 scratch[CDP_secondary_val].u_val = DNAN;
1717 break;
1718 case CF_FAILURES:
1719 /* do not count missed bulk values as failures */
1720 scratch[CDP_primary_val].u_val = 0;
1721 scratch[CDP_secondary_val].u_val = 0;
1722 /* need to reset violations buffer.
1723 * could do this more carefully, but for now, just
1724 * assume a bulk update wipes away all violations. */
1725 erase_violations(rrd, cdp_idx, rra_idx);
1726 break;
1727 }
1728 }
1730 static rrd_value_t initialize_average_carry_over(
1731 rrd_value_t pdp_temp_val,
1732 unsigned long elapsed_pdp_st,
1733 unsigned long start_pdp_offset,
1734 unsigned long pdp_cnt)
1735 {
1736 /* initialize carry over value */
1737 if (isnan(pdp_temp_val)) {
1738 return DNAN;
1739 }
1740 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1741 }
1743 /*
1744 * Update or initialize a CDP value based on the consolidation
1745 * function.
1746 *
1747 * Returns the new value.
1748 */
1749 static rrd_value_t calculate_cdp_val(
1750 rrd_value_t cdp_val,
1751 rrd_value_t pdp_temp_val,
1752 unsigned long elapsed_pdp_st,
1753 int current_cf,
1754 #ifdef DEBUG
1755 int i,
1756 int ii
1757 #else
1758 int UNUSED(i),
1759 int UNUSED(ii)
1760 #endif
1761 )
1762 {
1763 if (isnan(cdp_val)) {
1764 if (current_cf == CF_AVERAGE) {
1765 pdp_temp_val *= elapsed_pdp_st;
1766 }
1767 #ifdef DEBUG
1768 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1769 i, ii, pdp_temp_val);
1770 #endif
1771 return pdp_temp_val;
1772 }
1773 if (current_cf == CF_AVERAGE)
1774 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1775 if (current_cf == CF_MINIMUM)
1776 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1777 if (current_cf == CF_MAXIMUM)
1778 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1780 return pdp_temp_val;
1781 }
1783 /*
1784 * For each RRA, update the seasonal values and then call update_aberrant_CF
1785 * for each data source.
1786 *
1787 * Return 0 on success, -1 on error.
1788 */
1789 static int update_aberrant_cdps(
1790 rrd_t *rrd,
1791 rrd_file_t *rrd_file,
1792 unsigned long rra_begin,
1793 unsigned long *rra_current,
1794 unsigned long elapsed_pdp_st,
1795 rrd_value_t *pdp_temp,
1796 rrd_value_t **seasonal_coef)
1797 {
1798 unsigned long rra_idx, ds_idx, j;
1800 /* number of PDP steps since the last update that
1801 * are assigned to the first CDP to be generated
1802 * since the last update. */
1803 unsigned short scratch_idx;
1804 unsigned long rra_start;
1805 enum cf_en current_cf;
1807 /* this loop is only entered if elapsed_pdp_st < 3 */
1808 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1809 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1810 rra_start = rra_begin;
1811 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1812 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1813 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1814 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1815 if (scratch_idx == CDP_primary_val) {
1816 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1817 elapsed_pdp_st + 1, seasonal_coef);
1818 } else {
1819 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1820 elapsed_pdp_st + 2, seasonal_coef);
1821 }
1822 *rra_current = rrd_tell(rrd_file);
1823 }
1824 if (rrd_test_error())
1825 return -1;
1826 /* loop over data soures within each RRA */
1827 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1828 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1829 rra_idx * (rrd->stat_head->ds_cnt) +
1830 ds_idx, rra_idx, ds_idx, scratch_idx,
1831 *seasonal_coef);
1832 }
1833 }
1834 rra_start += rrd->rra_def[rra_idx].row_cnt
1835 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1836 }
1837 }
1838 return 0;
1839 }
1841 /*
1842 * Move sequentially through the file, writing one RRA at a time. Note this
1843 * architecture divorces the computation of CDP with flushing updated RRA
1844 * entries to disk.
1845 *
1846 * Return 0 on success, -1 on error.
1847 */
1848 static int write_to_rras(
1849 rrd_t *rrd,
1850 rrd_file_t *rrd_file,
1851 unsigned long *rra_step_cnt,
1852 unsigned long rra_begin,
1853 unsigned long *rra_current,
1854 time_t current_time,
1855 unsigned long *skip_update,
1856 info_t **pcdp_summary)
1857 {
1858 unsigned long rra_idx;
1859 unsigned long rra_start;
1860 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1861 time_t rra_time = 0; /* time of update for a RRA */
1863 /* Ready to write to disk */
1864 rra_start = rra_begin;
1865 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1866 /* skip unless there's something to write */
1867 if (rra_step_cnt[rra_idx]) {
1868 /* write the first row */
1869 #ifdef DEBUG
1870 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1871 #endif
1872 rrd->rra_ptr[rra_idx].cur_row++;
1873 if (rrd->rra_ptr[rra_idx].cur_row >=
1874 rrd->rra_def[rra_idx].row_cnt)
1875 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1876 /* position on the first row */
1877 rra_pos_tmp = rra_start +
1878 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1879 sizeof(rrd_value_t);
1880 if (rra_pos_tmp != *rra_current) {
1881 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1882 rrd_set_error("seek error in rrd");
1883 return -1;
1884 }
1885 *rra_current = rra_pos_tmp;
1886 }
1887 #ifdef DEBUG
1888 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1889 #endif
1890 if (!skip_update[rra_idx]) {
1891 if (*pcdp_summary != NULL) {
1892 rra_time = (current_time - current_time
1893 % (rrd->rra_def[rra_idx].pdp_cnt *
1894 rrd->stat_head->pdp_step))
1895 -
1896 ((rra_step_cnt[rra_idx] -
1897 1) * rrd->rra_def[rra_idx].pdp_cnt *
1898 rrd->stat_head->pdp_step);
1899 }
1900 if (write_RRA_row
1901 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1902 pcdp_summary, rra_time) == -1)
1903 return -1;
1904 }
1906 /* write other rows of the bulk update, if any */
1907 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1908 if (++rrd->rra_ptr[rra_idx].cur_row ==
1909 rrd->rra_def[rra_idx].row_cnt) {
1910 #ifdef DEBUG
1911 fprintf(stderr,
1912 "Wraparound for RRA %s, %lu updates left\n",
1913 rrd->rra_def[rra_idx].cf_nam,
1914 rra_step_cnt[rra_idx] - 1);
1915 #endif
1916 /* wrap */
1917 rrd->rra_ptr[rra_idx].cur_row = 0;
1918 /* seek back to beginning of current rra */
1919 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1920 rrd_set_error("seek error in rrd");
1921 return -1;
1922 }
1923 #ifdef DEBUG
1924 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1925 rrd_file->pos);
1926 #endif
1927 *rra_current = rra_start;
1928 }
1929 if (!skip_update[rra_idx]) {
1930 if (*pcdp_summary != NULL) {
1931 rra_time = (current_time - current_time
1932 % (rrd->rra_def[rra_idx].pdp_cnt *
1933 rrd->stat_head->pdp_step))
1934 -
1935 ((rra_step_cnt[rra_idx] -
1936 2) * rrd->rra_def[rra_idx].pdp_cnt *
1937 rrd->stat_head->pdp_step);
1938 }
1939 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1940 CDP_secondary_val, pcdp_summary,
1941 rra_time) == -1)
1942 return -1;
1943 }
1944 }
1945 }
1946 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1947 sizeof(rrd_value_t);
1948 } /* RRA LOOP */
1950 return 0;
1951 }
1953 /*
1954 * Write out one row of values (one value per DS) to the archive.
1955 *
1956 * Returns 0 on success, -1 on error.
1957 */
1958 static int write_RRA_row(
1959 rrd_file_t *rrd_file,
1960 rrd_t *rrd,
1961 unsigned long rra_idx,
1962 unsigned long *rra_current,
1963 unsigned short CDP_scratch_idx,
1964 info_t **pcdp_summary,
1965 time_t rra_time)
1966 {
1967 unsigned long ds_idx, cdp_idx;
1968 infoval iv;
1970 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1971 /* compute the cdp index */
1972 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1973 #ifdef DEBUG
1974 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1975 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1976 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1977 #endif
1978 if (*pcdp_summary != NULL) {
1979 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1980 /* append info to the return hash */
1981 *pcdp_summary = info_push(*pcdp_summary,
1982 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1983 rra_time,
1984 rrd->rra_def[rra_idx].
1985 cf_nam,
1986 rrd->rra_def[rra_idx].
1987 pdp_cnt,
1988 rrd->ds_def[ds_idx].
1989 ds_nam), RD_I_VAL, iv);
1990 }
1991 if (rrd_write(rrd_file,
1992 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1993 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1994 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1995 return -1;
1996 }
1997 *rra_current += sizeof(rrd_value_t);
1998 }
1999 return 0;
2000 }
2002 /*
2003 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2004 *
2005 * Returns 0 on success, -1 otherwise
2006 */
2007 static int smooth_all_rras(
2008 rrd_t *rrd,
2009 rrd_file_t *rrd_file,
2010 unsigned long rra_begin)
2011 {
2012 unsigned long rra_start = rra_begin;
2013 unsigned long rra_idx;
2015 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2016 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2017 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2018 #ifdef DEBUG
2019 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2020 #endif
2021 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2022 if (rrd_test_error())
2023 return -1;
2024 }
2025 rra_start += rrd->rra_def[rra_idx].row_cnt
2026 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2027 }
2028 return 0;
2029 }
2031 #ifndef HAVE_MMAP
2032 /*
2033 * Flush changes to disk (unless we're using mmap)
2034 *
2035 * Returns 0 on success, -1 otherwise
2036 */
2037 static int write_changes_to_disk(
2038 rrd_t *rrd,
2039 rrd_file_t *rrd_file,
2040 int version)
2041 {
2042 /* we just need to write back the live header portion now */
2043 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2044 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2045 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2046 SEEK_SET) != 0) {
2047 rrd_set_error("seek rrd for live header writeback");
2048 return -1;
2049 }
2050 if (version >= 3) {
2051 if (rrd_write(rrd_file, rrd->live_head,
2052 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2053 rrd_set_error("rrd_write live_head to rrd");
2054 return -1;
2055 }
2056 } else {
2057 if (rrd_write(rrd_file, &rrd->live_head->last_up,
2058 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2059 rrd_set_error("rrd_write live_head to rrd");
2060 return -1;
2061 }
2062 }
2065 if (rrd_write(rrd_file, rrd->pdp_prep,
2066 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2067 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2068 rrd_set_error("rrd_write pdp_prep to rrd");
2069 return -1;
2070 }
2072 if (rrd_write(rrd_file, rrd->cdp_prep,
2073 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2074 rrd->stat_head->ds_cnt)
2075 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2076 rrd->stat_head->ds_cnt)) {
2078 rrd_set_error("rrd_write cdp_prep to rrd");
2079 return -1;
2080 }
2082 if (rrd_write(rrd_file, rrd->rra_ptr,
2083 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2084 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2085 rrd_set_error("rrd_write rra_ptr to rrd");
2086 return -1;
2087 }
2088 return 0;
2089 }
2090 #endif