1 /*****************************************************************************
2 * RRDtool 1.2.99907080300 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include <locale.h>
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 * replacement.
29 */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
36 };
37 #endif
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static int gettimeofday(
45 struct timeval *t,
46 struct __timezone *tz)
47 {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
61 /* FUNCTION PROTOTYPES */
63 int rrd_update_r(
64 const char *filename,
65 const char *tmplt,
66 int argc,
67 const char **argv);
68 int _rrd_update(
69 const char *filename,
70 const char *tmplt,
71 int argc,
72 const char **argv,
73 info_t *);
75 static int allocate_data_structures(
76 rrd_t *rrd,
77 char ***updvals,
78 rrd_value_t **pdp_temp,
79 const char *tmplt,
80 long **tmpl_idx,
81 unsigned long *tmpl_cnt,
82 unsigned long **rra_step_cnt,
83 unsigned long **skip_update,
84 rrd_value_t **pdp_new);
86 static int parse_template(
87 rrd_t *rrd,
88 const char *tmplt,
89 unsigned long *tmpl_cnt,
90 long *tmpl_idx);
92 static int process_arg(
93 char *step_start,
94 rrd_t *rrd,
95 rrd_file_t *rrd_file,
96 unsigned long rra_begin,
97 unsigned long *rra_current,
98 time_t *current_time,
99 unsigned long *current_time_usec,
100 rrd_value_t *pdp_temp,
101 rrd_value_t *pdp_new,
102 unsigned long *rra_step_cnt,
103 char **updvals,
104 long *tmpl_idx,
105 unsigned long tmpl_cnt,
106 info_t **pcdp_summary,
107 int version,
108 unsigned long *skip_update,
109 int *schedule_smooth);
111 static int parse_ds(
112 rrd_t *rrd,
113 char **updvals,
114 long *tmpl_idx,
115 char *input,
116 unsigned long tmpl_cnt,
117 time_t *current_time,
118 unsigned long *current_time_usec,
119 int version);
121 static int get_time_from_reading(
122 rrd_t *rrd,
123 char timesyntax,
124 char **updvals,
125 time_t *current_time,
126 unsigned long *current_time_usec,
127 int version);
129 static int update_pdp_prep(
130 rrd_t *rrd,
131 char **updvals,
132 rrd_value_t *pdp_new,
133 double interval);
135 static int calculate_elapsed_steps(
136 rrd_t *rrd,
137 unsigned long current_time,
138 unsigned long current_time_usec,
139 double interval,
140 double *pre_int,
141 double *post_int,
142 unsigned long *proc_pdp_cnt);
144 static void simple_update(
145 rrd_t *rrd,
146 double interval,
147 rrd_value_t *pdp_new);
149 static int process_all_pdp_st(
150 rrd_t *rrd,
151 double interval,
152 double pre_int,
153 double post_int,
154 unsigned long elapsed_pdp_st,
155 rrd_value_t *pdp_new,
156 rrd_value_t *pdp_temp);
158 static int process_pdp_st(
159 rrd_t *rrd,
160 unsigned long ds_idx,
161 double interval,
162 double pre_int,
163 double post_int,
164 long diff_pdp_st,
165 rrd_value_t *pdp_new,
166 rrd_value_t *pdp_temp);
168 static int update_all_cdp_prep(
169 rrd_t *rrd,
170 unsigned long *rra_step_cnt,
171 unsigned long rra_begin,
172 rrd_file_t *rrd_file,
173 unsigned long elapsed_pdp_st,
174 unsigned long proc_pdp_cnt,
175 rrd_value_t **last_seasonal_coef,
176 rrd_value_t **seasonal_coef,
177 rrd_value_t *pdp_temp,
178 unsigned long *rra_current,
179 unsigned long *skip_update,
180 int *schedule_smooth);
182 static int do_schedule_smooth(
183 rrd_t *rrd,
184 unsigned long rra_idx,
185 unsigned long elapsed_pdp_st);
187 static int update_cdp_prep(
188 rrd_t *rrd,
189 unsigned long elapsed_pdp_st,
190 unsigned long start_pdp_offset,
191 unsigned long *rra_step_cnt,
192 int rra_idx,
193 rrd_value_t *pdp_temp,
194 rrd_value_t *last_seasonal_coef,
195 rrd_value_t *seasonal_coef,
196 int current_cf);
198 static void update_cdp(
199 unival *scratch,
200 int current_cf,
201 rrd_value_t pdp_temp_val,
202 unsigned long rra_step_cnt,
203 unsigned long elapsed_pdp_st,
204 unsigned long start_pdp_offset,
205 unsigned long pdp_cnt,
206 rrd_value_t xff,
207 int i,
208 int ii);
210 static void initialize_cdp_val(
211 unival *scratch,
212 int current_cf,
213 rrd_value_t pdp_temp_val,
214 unsigned long elapsed_pdp_st,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
219 rrd_t *rrd,
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
224 int rra_idx,
225 int ds_idx,
226 int cdp_idx,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_average_carry_over(
230 rrd_value_t pdp_temp_val,
231 unsigned long elapsed_pdp_st,
232 unsigned long start_pdp_offset,
233 unsigned long pdp_cnt);
235 static rrd_value_t calculate_cdp_val(
236 rrd_value_t cdp_val,
237 rrd_value_t pdp_temp_val,
238 unsigned long elapsed_pdp_st,
239 int current_cf,
240 int i,
241 int ii);
243 static int update_aberrant_cdps(
244 rrd_t *rrd,
245 rrd_file_t *rrd_file,
246 unsigned long rra_begin,
247 unsigned long *rra_current,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253 rrd_t *rrd,
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
257 unsigned long *rra_current,
258 time_t current_time,
259 unsigned long *skip_update,
260 info_t **pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
264 rrd_t *rrd,
265 unsigned long rra_idx,
266 unsigned long *rra_current,
267 unsigned short CDP_scratch_idx,
268 info_t **pcdp_summary,
269 time_t rra_time);
271 static int smooth_all_rras(
272 rrd_t *rrd,
273 rrd_file_t *rrd_file,
274 unsigned long rra_begin);
276 #ifndef HAVE_MMAP
277 static int write_changes_to_disk(
278 rrd_t *rrd,
279 rrd_file_t *rrd_file,
280 int version);
281 #endif
283 /*
284 * normalize time as returned by gettimeofday. usec part must
285 * be always >= 0
286 */
287 static inline void normalize_time(
288 struct timeval *t)
289 {
290 if (t->tv_usec < 0) {
291 t->tv_sec--;
292 t->tv_usec += 1e6L;
293 }
294 }
296 /*
297 * Sets current_time and current_time_usec based on the current time.
298 * current_time_usec is set to 0 if the version number is 1 or 2.
299 */
300 static inline void initialize_time(
301 time_t *current_time,
302 unsigned long *current_time_usec,
303 int version)
304 {
305 struct timeval tmp_time; /* used for time conversion */
307 gettimeofday(&tmp_time, 0);
308 normalize_time(&tmp_time);
309 *current_time = tmp_time.tv_sec;
310 if (version >= 3) {
311 *current_time_usec = tmp_time.tv_usec;
312 } else {
313 *current_time_usec = 0;
314 }
315 }
317 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319 info_t *rrd_update_v(
320 int argc,
321 char **argv)
322 {
323 char *tmplt = NULL;
324 info_t *result = NULL;
325 infoval rc;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
328 {0, 0, 0, 0}
329 };
331 rc.u_int = -1;
332 optind = 0;
333 opterr = 0; /* initialize getopt */
335 while (1) {
336 int option_index = 0;
337 int opt;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341 if (opt == EOF)
342 break;
344 switch (opt) {
345 case 't':
346 tmplt = optarg;
347 break;
349 case '?':
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
351 goto end_tag;
352 }
353 }
355 /* need at least 2 arguments: filename, data. */
356 if (argc - optind < 2) {
357 rrd_set_error("Not enough arguments");
358 goto end_tag;
359 }
360 rc.u_int = 0;
361 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
362 rc.u_int = _rrd_update(argv[optind], tmplt,
363 argc - optind - 1,
364 (const char **) (argv + optind + 1), result);
365 result->value.u_int = rc.u_int;
366 end_tag:
367 return result;
368 }
370 int rrd_update(
371 int argc,
372 char **argv)
373 {
374 struct option long_options[] = {
375 {"template", required_argument, 0, 't'},
376 {0, 0, 0, 0}
377 };
378 int option_index = 0;
379 int opt;
380 char *tmplt = NULL;
381 int rc = -1;
383 optind = 0;
384 opterr = 0; /* initialize getopt */
386 while (1) {
387 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389 if (opt == EOF)
390 break;
392 switch (opt) {
393 case 't':
394 tmplt = strdup(optarg);
395 break;
397 case '?':
398 rrd_set_error("unknown option '%s'", argv[optind - 1]);
399 goto out;
400 }
401 }
403 /* need at least 2 arguments: filename, data. */
404 if (argc - optind < 2) {
405 rrd_set_error("Not enough arguments");
406 goto out;
407 }
409 rc = rrd_update_r(argv[optind], tmplt,
410 argc - optind - 1, (const char **) (argv + optind + 1));
411 out:
412 free(tmplt);
413 return rc;
414 }
416 int rrd_update_r(
417 const char *filename,
418 const char *tmplt,
419 int argc,
420 const char **argv)
421 {
422 return _rrd_update(filename, tmplt, argc, argv, NULL);
423 }
425 int _rrd_update(
426 const char *filename,
427 const char *tmplt,
428 int argc,
429 const char **argv,
430 info_t *pcdp_summary)
431 {
433 int arg_i = 2;
435 unsigned long rra_begin; /* byte pointer to the rra
436 * area in the rrd file. this
437 * pointer never changes value */
438 unsigned long rra_current; /* byte pointer to the current write
439 * spot in the rrd file. */
440 rrd_value_t *pdp_new; /* prepare the incoming data to be added
441 * to the existing entry */
442 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
443 * to the cdp values */
445 long *tmpl_idx; /* index representing the settings
446 * transported by the tmplt index */
447 unsigned long tmpl_cnt = 2; /* time and data */
448 rrd_t rrd;
449 time_t current_time = 0;
450 unsigned long current_time_usec = 0; /* microseconds part of current time */
451 char **updvals;
452 int schedule_smooth = 0;
454 /* number of elapsed PDP steps since last update */
455 unsigned long *rra_step_cnt = NULL;
457 int version; /* rrd version */
458 rrd_file_t *rrd_file;
459 char *arg_copy; /* for processing the argv */
460 unsigned long *skip_update; /* RRAs to advance but not write */
462 /* need at least 1 arguments: data. */
463 if (argc < 1) {
464 rrd_set_error("Not enough arguments");
465 goto err_out;
466 }
468 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
469 goto err_free;
470 }
471 /* We are now at the beginning of the rra's */
472 rra_current = rra_begin = rrd_file->header_len;
474 version = atoi(rrd.stat_head->version);
476 initialize_time(¤t_time, ¤t_time_usec, version);
478 /* get exclusive lock to whole file.
479 * lock gets removed when we close the file.
480 */
481 if (LockRRD(rrd_file->fd) != 0) {
482 rrd_set_error("could not lock RRD");
483 goto err_close;
484 }
486 if (allocate_data_structures(&rrd, &updvals,
487 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
488 &rra_step_cnt, &skip_update,
489 &pdp_new) == -1) {
490 goto err_close;
491 }
493 /* loop through the arguments. */
494 for (arg_i = 0; arg_i < argc; arg_i++) {
495 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
496 rrd_set_error("failed duplication argv entry");
497 break;
498 }
499 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
500 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
501 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
502 &pcdp_summary, version, skip_update,
503 &schedule_smooth) == -1) {
504 free(arg_copy);
505 break;
506 }
507 free(arg_copy);
508 }
510 free(rra_step_cnt);
512 /* if we got here and if there is an error and if the file has not been
513 * written to, then close things up and return. */
514 if (rrd_test_error()) {
515 goto err_free_structures;
516 }
517 #ifndef HAVE_MMAP
518 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
519 goto err_free_structures;
520 }
521 #endif
523 /* calling the smoothing code here guarantees at most one smoothing
524 * operation per rrd_update call. Unfortunately, it is possible with bulk
525 * updates, or a long-delayed update for smoothing to occur off-schedule.
526 * This really isn't critical except during the burn-in cycles. */
527 if (schedule_smooth) {
528 smooth_all_rras(&rrd, rrd_file, rra_begin);
529 }
531 /* rrd_dontneed(rrd_file,&rrd); */
532 rrd_free(&rrd);
533 rrd_close(rrd_file);
535 free(pdp_new);
536 free(tmpl_idx);
537 free(pdp_temp);
538 free(skip_update);
539 free(updvals);
540 return 0;
542 err_free_structures:
543 free(pdp_new);
544 free(tmpl_idx);
545 free(pdp_temp);
546 free(skip_update);
547 free(updvals);
548 err_close:
549 rrd_close(rrd_file);
550 err_free:
551 rrd_free(&rrd);
552 err_out:
553 return -1;
554 }
556 /*
557 * get exclusive lock to whole file.
558 * lock gets removed when we close the file
559 *
560 * returns 0 on success
561 */
562 int LockRRD(
563 int in_file)
564 {
565 int rcstat;
567 {
568 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
569 struct _stat st;
571 if (_fstat(in_file, &st) == 0) {
572 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
573 } else {
574 rcstat = -1;
575 }
576 #else
577 struct flock lock;
579 lock.l_type = F_WRLCK; /* exclusive write lock */
580 lock.l_len = 0; /* whole file */
581 lock.l_start = 0; /* start of file */
582 lock.l_whence = SEEK_SET; /* end of file */
584 rcstat = fcntl(in_file, F_SETLK, &lock);
585 #endif
586 }
588 return (rcstat);
589 }
591 /*
592 * Allocate some important arrays used, and initialize the template.
593 *
594 * When it returns, either all of the structures are allocated
595 * or none of them are.
596 *
597 * Returns 0 on success, -1 on error.
598 */
599 static int allocate_data_structures(
600 rrd_t *rrd,
601 char ***updvals,
602 rrd_value_t **pdp_temp,
603 const char *tmplt,
604 long **tmpl_idx,
605 unsigned long *tmpl_cnt,
606 unsigned long **rra_step_cnt,
607 unsigned long **skip_update,
608 rrd_value_t **pdp_new)
609 {
610 unsigned i, ii;
611 if ((*updvals = (char **) malloc(sizeof(char *)
612 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
613 rrd_set_error("allocating updvals pointer array.");
614 return -1;
615 }
616 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
617 * rrd->stat_head->ds_cnt)) ==
618 NULL) {
619 rrd_set_error("allocating pdp_temp.");
620 goto err_free_updvals;
621 }
622 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
623 *
624 rrd->stat_head->rra_cnt)) ==
625 NULL) {
626 rrd_set_error("allocating skip_update.");
627 goto err_free_pdp_temp;
628 }
629 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
630 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
631 rrd_set_error("allocating tmpl_idx.");
632 goto err_free_skip_update;
633 }
634 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
635 *
636 (rrd->stat_head->
637 rra_cnt))) == NULL) {
638 rrd_set_error("allocating rra_step_cnt.");
639 goto err_free_tmpl_idx;
640 }
642 /* initialize tmplt redirector */
643 /* default config example (assume DS 1 is a CDEF DS)
644 tmpl_idx[0] -> 0; (time)
645 tmpl_idx[1] -> 1; (DS 0)
646 tmpl_idx[2] -> 3; (DS 2)
647 tmpl_idx[3] -> 4; (DS 3) */
648 (*tmpl_idx)[0] = 0; /* time */
649 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
650 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
651 (*tmpl_idx)[ii++] = i;
652 }
653 *tmpl_cnt = ii;
655 if (tmplt != NULL) {
656 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
657 goto err_free_rra_step_cnt;
658 }
659 }
661 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
662 * rrd->stat_head->ds_cnt)) == NULL) {
663 rrd_set_error("allocating pdp_new.");
664 goto err_free_rra_step_cnt;
665 }
667 return 0;
669 err_free_rra_step_cnt:
670 free(*rra_step_cnt);
671 err_free_tmpl_idx:
672 free(*tmpl_idx);
673 err_free_skip_update:
674 free(*skip_update);
675 err_free_pdp_temp:
676 free(*pdp_temp);
677 err_free_updvals:
678 free(*updvals);
679 return -1;
680 }
682 /*
683 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
684 *
685 * Returns 0 on success.
686 */
687 static int parse_template(
688 rrd_t *rrd,
689 const char *tmplt,
690 unsigned long *tmpl_cnt,
691 long *tmpl_idx)
692 {
693 char *dsname, *tmplt_copy;
694 unsigned int tmpl_len, i;
695 int ret = 0;
697 *tmpl_cnt = 1; /* the first entry is the time */
699 /* we should work on a writeable copy here */
700 if ((tmplt_copy = strdup(tmplt)) == NULL) {
701 rrd_set_error("error copying tmplt '%s'", tmplt);
702 ret = -1;
703 goto out;
704 }
706 dsname = tmplt_copy;
707 tmpl_len = strlen(tmplt_copy);
708 for (i = 0; i <= tmpl_len; i++) {
709 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
710 tmplt_copy[i] = '\0';
711 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
712 rrd_set_error("tmplt contains more DS definitions than RRD");
713 ret = -1;
714 goto out_free_tmpl_copy;
715 }
716 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
717 rrd_set_error("unknown DS name '%s'", dsname);
718 ret = -1;
719 goto out_free_tmpl_copy;
720 }
721 /* go to the next entry on the tmplt_copy */
722 if (i < tmpl_len)
723 dsname = &tmplt_copy[i + 1];
724 }
725 }
726 out_free_tmpl_copy:
727 free(tmplt_copy);
728 out:
729 return ret;
730 }
732 /*
733 * Parse an update string, updates the primary data points (PDPs)
734 * and consolidated data points (CDPs), and writes changes to the RRAs.
735 *
736 * Returns 0 on success, -1 on error.
737 */
738 static int process_arg(
739 char *step_start,
740 rrd_t *rrd,
741 rrd_file_t *rrd_file,
742 unsigned long rra_begin,
743 unsigned long *rra_current,
744 time_t *current_time,
745 unsigned long *current_time_usec,
746 rrd_value_t *pdp_temp,
747 rrd_value_t *pdp_new,
748 unsigned long *rra_step_cnt,
749 char **updvals,
750 long *tmpl_idx,
751 unsigned long tmpl_cnt,
752 info_t **pcdp_summary,
753 int version,
754 unsigned long *skip_update,
755 int *schedule_smooth)
756 {
757 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
759 /* a vector of future Holt-Winters seasonal coefs */
760 unsigned long elapsed_pdp_st;
762 double interval, pre_int, post_int; /* interval between this and
763 * the last run */
764 unsigned long proc_pdp_cnt;
765 unsigned long rra_start;
767 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
768 current_time, current_time_usec, version) == -1) {
769 return -1;
770 }
771 /* seek to the beginning of the rra's */
772 if (*rra_current != rra_begin) {
773 #ifndef HAVE_MMAP
774 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
775 rrd_set_error("seek error in rrd");
776 return -1;
777 }
778 #endif
779 *rra_current = rra_begin;
780 }
781 rra_start = rra_begin;
783 interval = (double) (*current_time - rrd->live_head->last_up)
784 + (double) ((long) *current_time_usec -
785 (long) rrd->live_head->last_up_usec) / 1e6f;
787 /* process the data sources and update the pdp_prep
788 * area accordingly */
789 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
790 return -1;
791 }
793 elapsed_pdp_st = calculate_elapsed_steps(rrd,
794 *current_time,
795 *current_time_usec, interval,
796 &pre_int, &post_int,
797 &proc_pdp_cnt);
799 /* has a pdp_st moment occurred since the last run ? */
800 if (elapsed_pdp_st == 0) {
801 /* no we have not passed a pdp_st moment. therefore update is simple */
802 simple_update(rrd, interval, pdp_new);
803 } else {
804 /* an pdp_st has occurred. */
805 if (process_all_pdp_st(rrd, interval,
806 pre_int, post_int,
807 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
808 return -1;
809 }
810 if (update_all_cdp_prep(rrd, rra_step_cnt,
811 rra_begin, rrd_file,
812 elapsed_pdp_st,
813 proc_pdp_cnt,
814 &last_seasonal_coef,
815 &seasonal_coef,
816 pdp_temp, rra_current,
817 skip_update, schedule_smooth) == -1) {
818 goto err_free_coefficients;
819 }
820 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
821 elapsed_pdp_st, pdp_temp,
822 &seasonal_coef) == -1) {
823 goto err_free_coefficients;
824 }
825 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
826 rra_current, *current_time, skip_update,
827 pcdp_summary) == -1) {
828 goto err_free_coefficients;
829 }
830 } /* endif a pdp_st has occurred */
831 rrd->live_head->last_up = *current_time;
832 rrd->live_head->last_up_usec = *current_time_usec;
834 free(seasonal_coef);
835 free(last_seasonal_coef);
836 return 0;
838 err_free_coefficients:
839 free(seasonal_coef);
840 free(last_seasonal_coef);
841 return -1;
842 }
844 /*
845 * Parse a DS string (time + colon-separated values), storing the
846 * results in current_time, current_time_usec, and updvals.
847 *
848 * Returns 0 on success, -1 on error.
849 */
850 static int parse_ds(
851 rrd_t *rrd,
852 char **updvals,
853 long *tmpl_idx,
854 char *input,
855 unsigned long tmpl_cnt,
856 time_t *current_time,
857 unsigned long *current_time_usec,
858 int version)
859 {
860 char *p;
861 unsigned long i;
862 char timesyntax;
864 updvals[0] = input;
865 /* initialize all ds input to unknown except the first one
866 which has always got to be set */
867 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
868 updvals[i] = "U";
870 /* separate all ds elements; first must be examined separately
871 due to alternate time syntax */
872 if ((p = strchr(input, '@')) != NULL) {
873 timesyntax = '@';
874 } else if ((p = strchr(input, ':')) != NULL) {
875 timesyntax = ':';
876 } else {
877 rrd_set_error("expected timestamp not found in data source from %s",
878 input);
879 return -1;
880 }
881 *p = '\0';
882 i = 1;
883 updvals[tmpl_idx[i++]] = p + 1;
884 while (*(++p)) {
885 if (*p == ':') {
886 *p = '\0';
887 if (i < tmpl_cnt) {
888 updvals[tmpl_idx[i++]] = p + 1;
889 }
890 }
891 }
893 if (i != tmpl_cnt) {
894 rrd_set_error("expected %lu data source readings (got %lu) from %s",
895 tmpl_cnt - 1, i, input);
896 return -1;
897 }
899 if (get_time_from_reading(rrd, timesyntax, updvals,
900 current_time, current_time_usec,
901 version) == -1) {
902 return -1;
903 }
904 return 0;
905 }
907 /*
908 * Parse the time in a DS string, store it in current_time and
909 * current_time_usec and verify that it's later than the last
910 * update for this DS.
911 *
912 * Returns 0 on success, -1 on error.
913 */
914 static int get_time_from_reading(
915 rrd_t *rrd,
916 char timesyntax,
917 char **updvals,
918 time_t *current_time,
919 unsigned long *current_time_usec,
920 int version)
921 {
922 double tmp;
923 char *parsetime_error = NULL;
924 char *old_locale;
925 struct rrd_time_value ds_tv;
926 struct timeval tmp_time; /* used for time conversion */
928 /* get the time from the reading ... handle N */
929 if (timesyntax == '@') { /* at-style */
930 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
931 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
932 return -1;
933 }
934 if (ds_tv.type == RELATIVE_TO_END_TIME ||
935 ds_tv.type == RELATIVE_TO_START_TIME) {
936 rrd_set_error("specifying time relative to the 'start' "
937 "or 'end' makes no sense here: %s", updvals[0]);
938 return -1;
939 }
940 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
941 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
942 } else if (strcmp(updvals[0], "N") == 0) {
943 gettimeofday(&tmp_time, 0);
944 normalize_time(&tmp_time);
945 *current_time = tmp_time.tv_sec;
946 *current_time_usec = tmp_time.tv_usec;
947 } else {
948 old_locale = setlocale(LC_NUMERIC, "C");
949 tmp = strtod(updvals[0], 0);
950 setlocale(LC_NUMERIC, old_locale);
951 *current_time = floor(tmp);
952 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
953 }
954 /* dont do any correction for old version RRDs */
955 if (version < 3)
956 *current_time_usec = 0;
958 if (*current_time < rrd->live_head->last_up ||
959 (*current_time == rrd->live_head->last_up &&
960 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
961 rrd_set_error("illegal attempt to update using time %ld when "
962 "last update time is %ld (minimum one second step)",
963 *current_time, rrd->live_head->last_up);
964 return -1;
965 }
966 return 0;
967 }
969 /*
970 * Update pdp_new by interpreting the updvals according to the DS type
971 * (COUNTER, GAUGE, etc.).
972 *
973 * Returns 0 on success, -1 on error.
974 */
975 static int update_pdp_prep(
976 rrd_t *rrd,
977 char **updvals,
978 rrd_value_t *pdp_new,
979 double interval)
980 {
981 unsigned long ds_idx;
982 int ii;
983 char *endptr; /* used in the conversion */
984 double rate;
985 char *old_locale;
986 enum dst_en dst_idx;
988 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
989 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
991 /* make sure we do not build diffs with old last_ds values */
992 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
993 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
994 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
995 }
997 /* NOTE: DST_CDEF should never enter this if block, because
998 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
999 * accidently specified a value for the DST_CDEF. To handle this case,
1000 * an extra check is required. */
1002 if ((updvals[ds_idx + 1][0] != 'U') &&
1003 (dst_idx != DST_CDEF) &&
1004 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1005 rate = DNAN;
1007 /* pdp_new contains rate * time ... eg the bytes transferred during
1008 * the interval. Doing it this way saves a lot of math operations
1009 */
1010 switch (dst_idx) {
1011 case DST_COUNTER:
1012 case DST_DERIVE:
1013 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1014 if ((updvals[ds_idx + 1][ii] < '0'
1015 || updvals[ds_idx + 1][ii] > '9')
1016 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1017 rrd_set_error("not a simple integer: '%s'",
1018 updvals[ds_idx + 1]);
1019 return -1;
1020 }
1021 }
1022 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1023 pdp_new[ds_idx] =
1024 rrd_diff(updvals[ds_idx + 1],
1025 rrd->pdp_prep[ds_idx].last_ds);
1026 if (dst_idx == DST_COUNTER) {
1027 /* simple overflow catcher. This will fail
1028 * terribly for non 32 or 64 bit counters
1029 * ... are there any others in SNMP land?
1030 */
1031 if (pdp_new[ds_idx] < (double) 0.0)
1032 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1033 if (pdp_new[ds_idx] < (double) 0.0)
1034 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1035 }
1036 rate = pdp_new[ds_idx] / interval;
1037 } else {
1038 pdp_new[ds_idx] = DNAN;
1039 }
1040 break;
1041 case DST_ABSOLUTE:
1042 old_locale = setlocale(LC_NUMERIC, "C");
1043 errno = 0;
1044 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1045 setlocale(LC_NUMERIC, old_locale);
1046 if (errno > 0) {
1047 rrd_set_error("converting '%s' to float: %s",
1048 updvals[ds_idx + 1], rrd_strerror(errno));
1049 return -1;
1050 };
1051 if (endptr[0] != '\0') {
1052 rrd_set_error
1053 ("conversion of '%s' to float not complete: tail '%s'",
1054 updvals[ds_idx + 1], endptr);
1055 return -1;
1056 }
1057 rate = pdp_new[ds_idx] / interval;
1058 break;
1059 case DST_GAUGE:
1060 errno = 0;
1061 old_locale = setlocale(LC_NUMERIC, "C");
1062 pdp_new[ds_idx] =
1063 strtod(updvals[ds_idx + 1], &endptr) * interval;
1064 setlocale(LC_NUMERIC, old_locale);
1065 if (errno) {
1066 rrd_set_error("converting '%s' to float: %s",
1067 updvals[ds_idx + 1], rrd_strerror(errno));
1068 return -1;
1069 };
1070 if (endptr[0] != '\0') {
1071 rrd_set_error
1072 ("conversion of '%s' to float not complete: tail '%s'",
1073 updvals[ds_idx + 1], endptr);
1074 return -1;
1075 }
1076 rate = pdp_new[ds_idx] / interval;
1077 break;
1078 default:
1079 rrd_set_error("rrd contains unknown DS type : '%s'",
1080 rrd->ds_def[ds_idx].dst);
1081 return -1;
1082 }
1083 /* break out of this for loop if the error string is set */
1084 if (rrd_test_error()) {
1085 return -1;
1086 }
1087 /* make sure pdp_temp is neither too large or too small
1088 * if any of these occur it becomes unknown ...
1089 * sorry folks ... */
1090 if (!isnan(rate) &&
1091 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1092 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1093 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1094 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1095 pdp_new[ds_idx] = DNAN;
1096 }
1097 } else {
1098 /* no news is news all the same */
1099 pdp_new[ds_idx] = DNAN;
1100 }
1103 /* make a copy of the command line argument for the next run */
1104 #ifdef DEBUG
1105 fprintf(stderr, "prep ds[%lu]\t"
1106 "last_arg '%s'\t"
1107 "this_arg '%s'\t"
1108 "pdp_new %10.2f\n",
1109 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1110 pdp_new[ds_idx]);
1111 #endif
1112 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1113 LAST_DS_LEN - 1);
1114 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1115 }
1116 return 0;
1117 }
1119 /*
1120 * How many PDP steps have elapsed since the last update? Returns the answer,
1121 * and stores the time between the last update and the last PDP in pre_time,
1122 * and the time between the last PDP and the current time in post_int.
1123 */
1124 static int calculate_elapsed_steps(
1125 rrd_t *rrd,
1126 unsigned long current_time,
1127 unsigned long current_time_usec,
1128 double interval,
1129 double *pre_int,
1130 double *post_int,
1131 unsigned long *proc_pdp_cnt)
1132 {
1133 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1134 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1135 * time */
1136 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1137 * when it was last updated */
1138 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1140 /* when was the current pdp started */
1141 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1142 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1144 /* when did the last pdp_st occur */
1145 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1146 occu_pdp_st = current_time - occu_pdp_age;
1148 if (occu_pdp_st > proc_pdp_st) {
1149 /* OK we passed the pdp_st moment */
1150 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1151 * occurred before the latest
1152 * pdp_st moment*/
1153 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1154 *post_int = occu_pdp_age; /* how much after it */
1155 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1156 } else {
1157 *pre_int = interval;
1158 *post_int = 0;
1159 }
1161 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1163 #ifdef DEBUG
1164 printf("proc_pdp_age %lu\t"
1165 "proc_pdp_st %lu\t"
1166 "occu_pfp_age %lu\t"
1167 "occu_pdp_st %lu\t"
1168 "int %lf\t"
1169 "pre_int %lf\t"
1170 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1171 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1172 #endif
1174 /* compute the number of elapsed pdp_st moments */
1175 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1176 }
1178 /*
1179 * Increment the PDP values by the values in pdp_new, or else initialize them.
1180 */
1181 static void simple_update(
1182 rrd_t *rrd,
1183 double interval,
1184 rrd_value_t *pdp_new)
1185 {
1186 int i;
1188 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1189 if (isnan(pdp_new[i])) {
1190 /* this is not really accurate if we use subsecond data arrival time
1191 should have thought of it when going subsecond resolution ...
1192 sorry next format change we will have it! */
1193 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1194 floor(interval);
1195 } else {
1196 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1197 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1198 } else {
1199 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1200 }
1201 }
1202 #ifdef DEBUG
1203 fprintf(stderr,
1204 "NO PDP ds[%i]\t"
1205 "value %10.2f\t"
1206 "unkn_sec %5lu\n",
1207 i,
1208 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1209 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1210 #endif
1211 }
1212 }
1214 /*
1215 * Call process_pdp_st for each DS.
1216 *
1217 * Returns 0 on success, -1 on error.
1218 */
1219 static int process_all_pdp_st(
1220 rrd_t *rrd,
1221 double interval,
1222 double pre_int,
1223 double post_int,
1224 unsigned long elapsed_pdp_st,
1225 rrd_value_t *pdp_new,
1226 rrd_value_t *pdp_temp)
1227 {
1228 unsigned long ds_idx;
1230 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1231 rate*seconds which occurred up to the last run.
1232 pdp_new[] contains rate*seconds from the latest run.
1233 pdp_temp[] will contain the rate for cdp */
1235 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1236 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1237 elapsed_pdp_st * rrd->stat_head->pdp_step,
1238 pdp_new, pdp_temp) == -1) {
1239 return -1;
1240 }
1241 #ifdef DEBUG
1242 fprintf(stderr, "PDP UPD ds[%lu]\t"
1243 "pdp_temp %10.2f\t"
1244 "new_prep %10.2f\t"
1245 "new_unkn_sec %5lu\n",
1246 ds_idx, pdp_temp[ds_idx],
1247 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1248 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1249 #endif
1250 }
1251 return 0;
1252 }
1254 /*
1255 * Process an update that occurs after one of the PDP moments.
1256 * Increments the PDP value, sets NAN if time greater than the
1257 * heartbeats have elapsed, processes CDEFs.
1258 *
1259 * Returns 0 on success, -1 on error.
1260 */
1261 static int process_pdp_st(
1262 rrd_t *rrd,
1263 unsigned long ds_idx,
1264 double interval,
1265 double pre_int,
1266 double post_int,
1267 long diff_pdp_st,
1268 rrd_value_t *pdp_new,
1269 rrd_value_t *pdp_temp)
1270 {
1271 int i;
1273 /* update pdp_prep to the current pdp_st. */
1274 double pre_unknown = 0.0;
1275 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1276 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1278 rpnstack_t rpnstack; /* used for COMPUTE DS */
1280 rpnstack_init(&rpnstack);
1283 if (isnan(pdp_new[ds_idx])) {
1284 /* a final bit of unknown to be added bevore calculation
1285 we use a temporary variable for this so that we
1286 don't have to turn integer lines before using the value */
1287 pre_unknown = pre_int;
1288 } else {
1289 if (isnan(scratch[PDP_val].u_val)) {
1290 scratch[PDP_val].u_val = 0;
1291 }
1292 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1293 }
1295 /* if too much of the pdp_prep is unknown we dump it */
1296 /* if the interval is larger thatn mrhb we get NAN */
1297 if ((interval > mrhb) ||
1298 (diff_pdp_st <= (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1299 pdp_temp[ds_idx] = DNAN;
1300 } else {
1301 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1302 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1303 pre_unknown);
1304 }
1306 /* process CDEF data sources; remember each CDEF DS can
1307 * only reference other DS with a lower index number */
1308 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1309 rpnp_t *rpnp;
1311 rpnp =
1312 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1313 /* substitute data values for OP_VARIABLE nodes */
1314 for (i = 0; rpnp[i].op != OP_END; i++) {
1315 if (rpnp[i].op == OP_VARIABLE) {
1316 rpnp[i].op = OP_NUMBER;
1317 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1318 }
1319 }
1320 /* run the rpn calculator */
1321 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1322 free(rpnp);
1323 rpnstack_free(&rpnstack);
1324 return -1;
1325 }
1326 }
1328 /* make pdp_prep ready for the next run */
1329 if (isnan(pdp_new[ds_idx])) {
1330 /* this is not realy accurate if we use subsecond data arival time
1331 should have thought of it when going subsecond resolution ...
1332 sorry next format change we will have it! */
1333 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1334 scratch[PDP_val].u_val = DNAN;
1335 } else {
1336 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1337 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1338 }
1339 rpnstack_free(&rpnstack);
1340 return 0;
1341 }
1343 /*
1344 * Iterate over all the RRAs for a given DS and:
1345 * 1. Decide whether to schedule a smooth later
1346 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1347 * 3. Update the CDP
1348 *
1349 * Returns 0 on success, -1 on error
1350 */
1351 static int update_all_cdp_prep(
1352 rrd_t *rrd,
1353 unsigned long *rra_step_cnt,
1354 unsigned long rra_begin,
1355 rrd_file_t *rrd_file,
1356 unsigned long elapsed_pdp_st,
1357 unsigned long proc_pdp_cnt,
1358 rrd_value_t **last_seasonal_coef,
1359 rrd_value_t **seasonal_coef,
1360 rrd_value_t *pdp_temp,
1361 unsigned long *rra_current,
1362 unsigned long *skip_update,
1363 int *schedule_smooth)
1364 {
1365 unsigned long rra_idx;
1367 /* index into the CDP scratch array */
1368 enum cf_en current_cf;
1369 unsigned long rra_start;
1371 /* number of rows to be updated in an RRA for a data value. */
1372 unsigned long start_pdp_offset;
1374 rra_start = rra_begin;
1375 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1376 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1377 start_pdp_offset =
1378 rrd->rra_def[rra_idx].pdp_cnt -
1379 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1380 skip_update[rra_idx] = 0;
1381 if (start_pdp_offset <= elapsed_pdp_st) {
1382 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1383 rrd->rra_def[rra_idx].pdp_cnt + 1;
1384 } else {
1385 rra_step_cnt[rra_idx] = 0;
1386 }
1388 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1389 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1390 * so that they will be correct for the next observed value; note that for
1391 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1392 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1393 if (rra_step_cnt[rra_idx] > 1) {
1394 skip_update[rra_idx] = 1;
1395 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1396 elapsed_pdp_st, last_seasonal_coef);
1397 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1398 elapsed_pdp_st + 1, seasonal_coef);
1399 }
1400 /* periodically run a smoother for seasonal effects */
1401 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1402 #ifdef DEBUG
1403 fprintf(stderr,
1404 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1405 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1406 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1407 u_cnt);
1408 #endif
1409 *schedule_smooth = 1;
1410 }
1411 *rra_current = rrd_tell(rrd_file);
1412 }
1413 if (rrd_test_error())
1414 return -1;
1416 if (update_cdp_prep
1417 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1418 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1419 current_cf) == -1) {
1420 return -1;
1421 }
1422 rra_start +=
1423 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1424 sizeof(rrd_value_t);
1425 }
1426 return 0;
1427 }
1429 /*
1430 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1431 */
1432 static int do_schedule_smooth(
1433 rrd_t *rrd,
1434 unsigned long rra_idx,
1435 unsigned long elapsed_pdp_st)
1436 {
1437 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1438 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1439 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1440 unsigned long seasonal_smooth_idx =
1441 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1442 unsigned long *init_seasonal =
1443 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1445 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1446 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1447 * really an RRA level, not a data source within RRA level parameter, but
1448 * the rra_def is read only for rrd_update (not flushed to disk). */
1449 if (*init_seasonal > BURNIN_CYCLES) {
1450 /* someone has no doubt invented a trick to deal with this wrap around,
1451 * but at least this code is clear. */
1452 if (seasonal_smooth_idx > cur_row) {
1453 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1454 * between PDP and CDP */
1455 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1456 }
1457 /* can't rely on negative numbers because we are working with
1458 * unsigned values */
1459 return (cur_row + elapsed_pdp_st >= row_cnt
1460 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1461 }
1462 /* mark off one of the burn-in cycles */
1463 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1464 }
1466 /*
1467 * For a given RRA, iterate over the data sources and call the appropriate
1468 * consolidation function.
1469 *
1470 * Returns 0 on success, -1 on error.
1471 */
1472 static int update_cdp_prep(
1473 rrd_t *rrd,
1474 unsigned long elapsed_pdp_st,
1475 unsigned long start_pdp_offset,
1476 unsigned long *rra_step_cnt,
1477 int rra_idx,
1478 rrd_value_t *pdp_temp,
1479 rrd_value_t *last_seasonal_coef,
1480 rrd_value_t *seasonal_coef,
1481 int current_cf)
1482 {
1483 unsigned long ds_idx, cdp_idx;
1485 /* update CDP_PREP areas */
1486 /* loop over data soures within each RRA */
1487 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1489 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1491 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1492 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1493 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1494 elapsed_pdp_st, start_pdp_offset,
1495 rrd->rra_def[rra_idx].pdp_cnt,
1496 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1497 rra_idx, ds_idx);
1498 } else {
1499 /* Nothing to consolidate if there's one PDP per CDP. However, if
1500 * we've missed some PDPs, let's update null counters etc. */
1501 if (elapsed_pdp_st > 2) {
1502 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1503 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1504 current_cf);
1505 }
1506 }
1508 if (rrd_test_error())
1509 return -1;
1510 } /* endif data sources loop */
1511 return 0;
1512 }
1514 /*
1515 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1516 * primary value, secondary value, and # of unknowns.
1517 */
1518 static void update_cdp(
1519 unival *scratch,
1520 int current_cf,
1521 rrd_value_t pdp_temp_val,
1522 unsigned long rra_step_cnt,
1523 unsigned long elapsed_pdp_st,
1524 unsigned long start_pdp_offset,
1525 unsigned long pdp_cnt,
1526 rrd_value_t xff,
1527 int i,
1528 int ii)
1529 {
1530 /* shorthand variables */
1531 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1532 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1533 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1534 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1536 if (rra_step_cnt) {
1537 /* If we are in this block, as least 1 CDP value will be written to
1538 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1539 * to be written, then the "fill in" value is the CDP_secondary_val
1540 * entry. */
1541 if (isnan(pdp_temp_val)) {
1542 *cdp_unkn_pdp_cnt += start_pdp_offset;
1543 *cdp_secondary_val = DNAN;
1544 } else {
1545 /* CDP_secondary value is the RRA "fill in" value for intermediary
1546 * CDP data entries. No matter the CF, the value is the same because
1547 * the average, max, min, and last of a list of identical values is
1548 * the same, namely, the value itself. */
1549 *cdp_secondary_val = pdp_temp_val;
1550 }
1552 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1553 *cdp_primary_val = DNAN;
1554 if (current_cf == CF_AVERAGE) {
1555 *cdp_val =
1556 initialize_average_carry_over(pdp_temp_val,
1557 elapsed_pdp_st,
1558 start_pdp_offset, pdp_cnt);
1559 } else {
1560 *cdp_val = pdp_temp_val;
1561 }
1562 } else {
1563 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1564 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1565 } /* endif meets xff value requirement for a valid value */
1566 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1567 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1568 if (isnan(pdp_temp_val))
1569 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1570 else
1571 *cdp_unkn_pdp_cnt = 0;
1572 } else { /* rra_step_cnt[i] == 0 */
1574 #ifdef DEBUG
1575 if (isnan(*cdp_val)) {
1576 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1577 i, ii);
1578 } else {
1579 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1580 i, ii, *cdp_val);
1581 }
1582 #endif
1583 if (isnan(pdp_temp_val)) {
1584 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1585 } else {
1586 *cdp_val =
1587 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1588 current_cf, i, ii);
1589 }
1590 }
1591 }
1593 /*
1594 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1595 * on the type of consolidation function.
1596 */
1597 static void initialize_cdp_val(
1598 unival *scratch,
1599 int current_cf,
1600 rrd_value_t pdp_temp_val,
1601 unsigned long elapsed_pdp_st,
1602 unsigned long start_pdp_offset,
1603 unsigned long pdp_cnt)
1604 {
1605 rrd_value_t cum_val, cur_val;
1607 switch (current_cf) {
1608 case CF_AVERAGE:
1609 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1610 cur_val = IFDNAN(pdp_temp_val, 0.0);
1611 scratch[CDP_primary_val].u_val =
1612 (cum_val + cur_val * start_pdp_offset) /
1613 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1614 scratch[CDP_val].u_val =
1615 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1616 start_pdp_offset, pdp_cnt);
1617 break;
1618 case CF_MAXIMUM:
1619 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1620 cur_val = IFDNAN(pdp_temp_val, -DINF);
1621 #if 0
1622 #ifdef DEBUG
1623 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1624 fprintf(stderr,
1625 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1626 i, ii);
1627 exit(-1);
1628 }
1629 #endif
1630 #endif
1631 if (cur_val > cum_val)
1632 scratch[CDP_primary_val].u_val = cur_val;
1633 else
1634 scratch[CDP_primary_val].u_val = cum_val;
1635 /* initialize carry over value */
1636 scratch[CDP_val].u_val = pdp_temp_val;
1637 break;
1638 case CF_MINIMUM:
1639 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1640 cur_val = IFDNAN(pdp_temp_val, DINF);
1641 #if 0
1642 #ifdef DEBUG
1643 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1644 fprintf(stderr,
1645 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1646 ii);
1647 exit(-1);
1648 }
1649 #endif
1650 #endif
1651 if (cur_val < cum_val)
1652 scratch[CDP_primary_val].u_val = cur_val;
1653 else
1654 scratch[CDP_primary_val].u_val = cum_val;
1655 /* initialize carry over value */
1656 scratch[CDP_val].u_val = pdp_temp_val;
1657 break;
1658 case CF_LAST:
1659 default:
1660 scratch[CDP_primary_val].u_val = pdp_temp_val;
1661 /* initialize carry over value */
1662 scratch[CDP_val].u_val = pdp_temp_val;
1663 break;
1664 }
1665 }
1667 /*
1668 * Update the consolidation function for Holt-Winters functions as
1669 * well as other functions that don't actually consolidate multiple
1670 * PDPs.
1671 */
1672 static void reset_cdp(
1673 rrd_t *rrd,
1674 unsigned long elapsed_pdp_st,
1675 rrd_value_t *pdp_temp,
1676 rrd_value_t *last_seasonal_coef,
1677 rrd_value_t *seasonal_coef,
1678 int rra_idx,
1679 int ds_idx,
1680 int cdp_idx,
1681 enum cf_en current_cf)
1682 {
1683 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1685 switch (current_cf) {
1686 case CF_AVERAGE:
1687 default:
1688 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1689 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1690 break;
1691 case CF_SEASONAL:
1692 case CF_DEVSEASONAL:
1693 /* need to update cached seasonal values, so they are consistent
1694 * with the bulk update */
1695 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1696 * CDP_last_deviation are the same. */
1697 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1698 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1699 break;
1700 case CF_HWPREDICT:
1701 case CF_MHWPREDICT:
1702 /* need to update the null_count and last_null_count.
1703 * even do this for non-DNAN pdp_temp because the
1704 * algorithm is not learning from batch updates. */
1705 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1706 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1707 /* fall through */
1708 case CF_DEVPREDICT:
1709 scratch[CDP_primary_val].u_val = DNAN;
1710 scratch[CDP_secondary_val].u_val = DNAN;
1711 break;
1712 case CF_FAILURES:
1713 /* do not count missed bulk values as failures */
1714 scratch[CDP_primary_val].u_val = 0;
1715 scratch[CDP_secondary_val].u_val = 0;
1716 /* need to reset violations buffer.
1717 * could do this more carefully, but for now, just
1718 * assume a bulk update wipes away all violations. */
1719 erase_violations(rrd, cdp_idx, rra_idx);
1720 break;
1721 }
1722 }
1724 static rrd_value_t initialize_average_carry_over(
1725 rrd_value_t pdp_temp_val,
1726 unsigned long elapsed_pdp_st,
1727 unsigned long start_pdp_offset,
1728 unsigned long pdp_cnt)
1729 {
1730 /* initialize carry over value */
1731 if (isnan(pdp_temp_val)) {
1732 return DNAN;
1733 }
1734 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1735 }
1737 /*
1738 * Update or initialize a CDP value based on the consolidation
1739 * function.
1740 *
1741 * Returns the new value.
1742 */
1743 static rrd_value_t calculate_cdp_val(
1744 rrd_value_t cdp_val,
1745 rrd_value_t pdp_temp_val,
1746 unsigned long elapsed_pdp_st,
1747 int current_cf,
1748 int i,
1749 int ii)
1750 {
1751 if (isnan(cdp_val)) {
1752 if (current_cf == CF_AVERAGE) {
1753 pdp_temp_val *= elapsed_pdp_st;
1754 }
1755 #ifdef DEBUG
1756 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1757 i, ii, pdp_temp_val);
1758 #endif
1759 return pdp_temp_val;
1760 }
1761 if (current_cf == CF_AVERAGE)
1762 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1763 if (current_cf == CF_MINIMUM)
1764 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1765 if (current_cf == CF_MAXIMUM)
1766 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1768 return pdp_temp_val;
1769 }
1771 /*
1772 * For each RRA, update the seasonal values and then call update_aberrant_CF
1773 * for each data source.
1774 *
1775 * Return 0 on success, -1 on error.
1776 */
1777 static int update_aberrant_cdps(
1778 rrd_t *rrd,
1779 rrd_file_t *rrd_file,
1780 unsigned long rra_begin,
1781 unsigned long *rra_current,
1782 unsigned long elapsed_pdp_st,
1783 rrd_value_t *pdp_temp,
1784 rrd_value_t **seasonal_coef)
1785 {
1786 unsigned long rra_idx, ds_idx, j;
1788 /* number of PDP steps since the last update that
1789 * are assigned to the first CDP to be generated
1790 * since the last update. */
1791 unsigned short scratch_idx;
1792 unsigned long rra_start;
1793 enum cf_en current_cf;
1795 /* this loop is only entered if elapsed_pdp_st < 3 */
1796 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1797 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1798 rra_start = rra_begin;
1799 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1800 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1801 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1802 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1803 if (scratch_idx == CDP_primary_val) {
1804 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1805 elapsed_pdp_st + 1, seasonal_coef);
1806 } else {
1807 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1808 elapsed_pdp_st + 2, seasonal_coef);
1809 }
1810 *rra_current = rrd_tell(rrd_file);
1811 }
1812 if (rrd_test_error())
1813 return -1;
1814 /* loop over data soures within each RRA */
1815 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1816 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1817 rra_idx * (rrd->stat_head->ds_cnt) +
1818 ds_idx, rra_idx, ds_idx, scratch_idx,
1819 *seasonal_coef);
1820 }
1821 }
1822 rra_start += rrd->rra_def[rra_idx].row_cnt
1823 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1824 }
1825 }
1826 return 0;
1827 }
1829 /*
1830 * Move sequentially through the file, writing one RRA at a time. Note this
1831 * architecture divorces the computation of CDP with flushing updated RRA
1832 * entries to disk.
1833 *
1834 * Return 0 on success, -1 on error.
1835 */
1836 static int write_to_rras(
1837 rrd_t *rrd,
1838 rrd_file_t *rrd_file,
1839 unsigned long *rra_step_cnt,
1840 unsigned long rra_begin,
1841 unsigned long *rra_current,
1842 time_t current_time,
1843 unsigned long *skip_update,
1844 info_t **pcdp_summary)
1845 {
1846 unsigned long rra_idx;
1847 unsigned long rra_start;
1848 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1849 time_t rra_time = 0; /* time of update for a RRA */
1851 /* Ready to write to disk */
1852 rra_start = rra_begin;
1853 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1854 /* skip unless there's something to write */
1855 if (rra_step_cnt[rra_idx]) {
1856 /* write the first row */
1857 #ifdef DEBUG
1858 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1859 #endif
1860 rrd->rra_ptr[rra_idx].cur_row++;
1861 if (rrd->rra_ptr[rra_idx].cur_row >=
1862 rrd->rra_def[rra_idx].row_cnt)
1863 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1864 /* position on the first row */
1865 rra_pos_tmp = rra_start +
1866 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1867 sizeof(rrd_value_t);
1868 if (rra_pos_tmp != *rra_current) {
1869 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1870 rrd_set_error("seek error in rrd");
1871 return -1;
1872 }
1873 *rra_current = rra_pos_tmp;
1874 }
1875 #ifdef DEBUG
1876 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1877 #endif
1878 if (!skip_update[rra_idx]) {
1879 if (*pcdp_summary != NULL) {
1880 rra_time = (current_time - current_time
1881 % (rrd->rra_def[rra_idx].pdp_cnt *
1882 rrd->stat_head->pdp_step))
1883 -
1884 ((rra_step_cnt[rra_idx] -
1885 1) * rrd->rra_def[rra_idx].pdp_cnt *
1886 rrd->stat_head->pdp_step);
1887 }
1888 if (write_RRA_row
1889 (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1890 pcdp_summary, rra_time) == -1)
1891 return -1;
1892 }
1894 /* write other rows of the bulk update, if any */
1895 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1896 if (++rrd->rra_ptr[rra_idx].cur_row ==
1897 rrd->rra_def[rra_idx].row_cnt) {
1898 #ifdef DEBUG
1899 fprintf(stderr,
1900 "Wraparound for RRA %s, %lu updates left\n",
1901 rrd->rra_def[rra_idx].cf_nam,
1902 rra_step_cnt[rra_idx] - 1);
1903 #endif
1904 /* wrap */
1905 rrd->rra_ptr[rra_idx].cur_row = 0;
1906 /* seek back to beginning of current rra */
1907 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1908 rrd_set_error("seek error in rrd");
1909 return -1;
1910 }
1911 #ifdef DEBUG
1912 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1913 rrd_file->pos);
1914 #endif
1915 *rra_current = rra_start;
1916 }
1917 if (!skip_update[rra_idx]) {
1918 if (*pcdp_summary != NULL) {
1919 rra_time = (current_time - current_time
1920 % (rrd->rra_def[rra_idx].pdp_cnt *
1921 rrd->stat_head->pdp_step))
1922 -
1923 ((rra_step_cnt[rra_idx] -
1924 2) * rrd->rra_def[rra_idx].pdp_cnt *
1925 rrd->stat_head->pdp_step);
1926 }
1927 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1928 CDP_secondary_val, pcdp_summary,
1929 rra_time) == -1)
1930 return -1;
1931 }
1932 }
1933 }
1934 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1935 sizeof(rrd_value_t);
1936 } /* RRA LOOP */
1938 return 0;
1939 }
1941 /*
1942 * Write out one row of values (one value per DS) to the archive.
1943 *
1944 * Returns 0 on success, -1 on error.
1945 */
1946 static int write_RRA_row(
1947 rrd_file_t *rrd_file,
1948 rrd_t *rrd,
1949 unsigned long rra_idx,
1950 unsigned long *rra_current,
1951 unsigned short CDP_scratch_idx,
1952 info_t **pcdp_summary,
1953 time_t rra_time)
1954 {
1955 unsigned long ds_idx, cdp_idx;
1956 infoval iv;
1958 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1959 /* compute the cdp index */
1960 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1961 #ifdef DEBUG
1962 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1963 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1964 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1965 #endif
1966 if (*pcdp_summary != NULL) {
1967 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1968 /* append info to the return hash */
1969 *pcdp_summary = info_push(*pcdp_summary,
1970 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1971 rra_time,
1972 rrd->rra_def[rra_idx].
1973 cf_nam,
1974 rrd->rra_def[rra_idx].
1975 pdp_cnt,
1976 rrd->ds_def[ds_idx].
1977 ds_nam), RD_I_VAL, iv);
1978 }
1979 if (rrd_write(rrd_file,
1980 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1981 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1982 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1983 return -1;
1984 }
1985 *rra_current += sizeof(rrd_value_t);
1986 }
1987 return 0;
1988 }
1990 /*
1991 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1992 *
1993 * Returns 0 on success, -1 otherwise
1994 */
1995 static int smooth_all_rras(
1996 rrd_t *rrd,
1997 rrd_file_t *rrd_file,
1998 unsigned long rra_begin)
1999 {
2000 unsigned long rra_start = rra_begin;
2001 unsigned long rra_idx;
2003 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2004 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2005 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2006 #ifdef DEBUG
2007 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2008 #endif
2009 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2010 if (rrd_test_error())
2011 return -1;
2012 }
2013 rra_start += rrd->rra_def[rra_idx].row_cnt
2014 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2015 }
2016 return 0;
2017 }
2019 #ifndef HAVE_MMAP
2020 /*
2021 * Flush changes to disk (unless we're using mmap)
2022 *
2023 * Returns 0 on success, -1 otherwise
2024 */
2025 static int write_changes_to_disk(
2026 rrd_t *rrd,
2027 rrd_file_t *rrd_file,
2028 int version)
2029 {
2030 /* we just need to write back the live header portion now */
2031 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2032 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2033 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2034 SEEK_SET) != 0) {
2035 rrd_set_error("seek rrd for live header writeback");
2036 return -1;
2037 }
2038 if (version >= 3) {
2039 if (rrd_write(rrd_file, rrd->live_head,
2040 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2041 rrd_set_error("rrd_write live_head to rrd");
2042 return -1;
2043 }
2044 } else {
2045 if (rrd_write(rrd_file, &rrd->live_head->last_up,
2046 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2047 rrd_set_error("rrd_write live_head to rrd");
2048 return -1;
2049 }
2050 }
2053 if (rrd_write(rrd_file, rrd->pdp_prep,
2054 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2055 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2056 rrd_set_error("rrd_write pdp_prep to rrd");
2057 return -1;
2058 }
2060 if (rrd_write(rrd_file, rrd->cdp_prep,
2061 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2062 rrd->stat_head->ds_cnt)
2063 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2064 rrd->stat_head->ds_cnt)) {
2066 rrd_set_error("rrd_write cdp_prep to rrd");
2067 return -1;
2068 }
2070 if (rrd_write(rrd_file, rrd->rra_ptr,
2071 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2072 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2073 rrd_set_error("rrd_write rra_ptr to rrd");
2074 return -1;
2075 }
2076 return 0;
2077 }
2078 #endif