2 /*****************************************************************************
3 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
4 * Copyright by Florian Forster, 2008
5 *****************************************************************************
6 * rrd_update.c RRD Update Function
7 *****************************************************************************
8 * $Id$
9 *****************************************************************************/
11 #include "rrd_tool.h"
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
19 #include <locale.h>
21 #include "rrd_hw.h"
22 #include "rrd_rpncalc.h"
24 #include "rrd_is_thread_safe.h"
25 #include "unused.h"
27 #include "rrd_client.h"
29 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
30 /*
31 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
32 * replacement.
33 */
34 #include <sys/timeb.h>
36 #ifndef __MINGW32__
37 struct timeval {
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
40 };
41 #endif
43 struct __timezone {
44 int tz_minuteswest; /* minutes W of Greenwich */
45 int tz_dsttime; /* type of dst correction */
46 };
48 static int gettimeofday(
49 struct timeval *t,
50 struct __timezone *tz)
51 {
53 struct _timeb current_time;
55 _ftime(¤t_time);
57 t->tv_sec = current_time.time;
58 t->tv_usec = current_time.millitm * 1000;
60 return 0;
61 }
63 #endif
65 /* FUNCTION PROTOTYPES */
67 int rrd_update_r(
68 const char *filename,
69 const char *tmplt,
70 int argc,
71 const char **argv);
72 int _rrd_update(
73 const char *filename,
74 const char *tmplt,
75 int argc,
76 const char **argv,
77 rrd_info_t *);
79 static int allocate_data_structures(
80 rrd_t *rrd,
81 char ***updvals,
82 rrd_value_t **pdp_temp,
83 const char *tmplt,
84 long **tmpl_idx,
85 unsigned long *tmpl_cnt,
86 unsigned long **rra_step_cnt,
87 unsigned long **skip_update,
88 rrd_value_t **pdp_new);
90 static int parse_template(
91 rrd_t *rrd,
92 const char *tmplt,
93 unsigned long *tmpl_cnt,
94 long *tmpl_idx);
96 static int process_arg(
97 char *step_start,
98 rrd_t *rrd,
99 rrd_file_t *rrd_file,
100 unsigned long rra_begin,
101 time_t *current_time,
102 unsigned long *current_time_usec,
103 rrd_value_t *pdp_temp,
104 rrd_value_t *pdp_new,
105 unsigned long *rra_step_cnt,
106 char **updvals,
107 long *tmpl_idx,
108 unsigned long tmpl_cnt,
109 rrd_info_t ** pcdp_summary,
110 int version,
111 unsigned long *skip_update,
112 int *schedule_smooth);
114 static int parse_ds(
115 rrd_t *rrd,
116 char **updvals,
117 long *tmpl_idx,
118 char *input,
119 unsigned long tmpl_cnt,
120 time_t *current_time,
121 unsigned long *current_time_usec,
122 int version);
124 static int get_time_from_reading(
125 rrd_t *rrd,
126 char timesyntax,
127 char **updvals,
128 time_t *current_time,
129 unsigned long *current_time_usec,
130 int version);
132 static int update_pdp_prep(
133 rrd_t *rrd,
134 char **updvals,
135 rrd_value_t *pdp_new,
136 double interval);
138 static int calculate_elapsed_steps(
139 rrd_t *rrd,
140 unsigned long current_time,
141 unsigned long current_time_usec,
142 double interval,
143 double *pre_int,
144 double *post_int,
145 unsigned long *proc_pdp_cnt);
147 static void simple_update(
148 rrd_t *rrd,
149 double interval,
150 rrd_value_t *pdp_new);
152 static int process_all_pdp_st(
153 rrd_t *rrd,
154 double interval,
155 double pre_int,
156 double post_int,
157 unsigned long elapsed_pdp_st,
158 rrd_value_t *pdp_new,
159 rrd_value_t *pdp_temp);
161 static int process_pdp_st(
162 rrd_t *rrd,
163 unsigned long ds_idx,
164 double interval,
165 double pre_int,
166 double post_int,
167 long diff_pdp_st,
168 rrd_value_t *pdp_new,
169 rrd_value_t *pdp_temp);
171 static int update_all_cdp_prep(
172 rrd_t *rrd,
173 unsigned long *rra_step_cnt,
174 unsigned long rra_begin,
175 rrd_file_t *rrd_file,
176 unsigned long elapsed_pdp_st,
177 unsigned long proc_pdp_cnt,
178 rrd_value_t **last_seasonal_coef,
179 rrd_value_t **seasonal_coef,
180 rrd_value_t *pdp_temp,
181 unsigned long *skip_update,
182 int *schedule_smooth);
184 static int do_schedule_smooth(
185 rrd_t *rrd,
186 unsigned long rra_idx,
187 unsigned long elapsed_pdp_st);
189 static int update_cdp_prep(
190 rrd_t *rrd,
191 unsigned long elapsed_pdp_st,
192 unsigned long start_pdp_offset,
193 unsigned long *rra_step_cnt,
194 int rra_idx,
195 rrd_value_t *pdp_temp,
196 rrd_value_t *last_seasonal_coef,
197 rrd_value_t *seasonal_coef,
198 int current_cf);
200 static void update_cdp(
201 unival *scratch,
202 int current_cf,
203 rrd_value_t pdp_temp_val,
204 unsigned long rra_step_cnt,
205 unsigned long elapsed_pdp_st,
206 unsigned long start_pdp_offset,
207 unsigned long pdp_cnt,
208 rrd_value_t xff,
209 int i,
210 int ii);
212 static void initialize_cdp_val(
213 unival *scratch,
214 int current_cf,
215 rrd_value_t pdp_temp_val,
216 unsigned long elapsed_pdp_st,
217 unsigned long start_pdp_offset,
218 unsigned long pdp_cnt);
220 static void reset_cdp(
221 rrd_t *rrd,
222 unsigned long elapsed_pdp_st,
223 rrd_value_t *pdp_temp,
224 rrd_value_t *last_seasonal_coef,
225 rrd_value_t *seasonal_coef,
226 int rra_idx,
227 int ds_idx,
228 int cdp_idx,
229 enum cf_en current_cf);
231 static rrd_value_t initialize_average_carry_over(
232 rrd_value_t pdp_temp_val,
233 unsigned long elapsed_pdp_st,
234 unsigned long start_pdp_offset,
235 unsigned long pdp_cnt);
237 static rrd_value_t calculate_cdp_val(
238 rrd_value_t cdp_val,
239 rrd_value_t pdp_temp_val,
240 unsigned long elapsed_pdp_st,
241 int current_cf,
242 int i,
243 int ii);
245 static int update_aberrant_cdps(
246 rrd_t *rrd,
247 rrd_file_t *rrd_file,
248 unsigned long rra_begin,
249 unsigned long elapsed_pdp_st,
250 rrd_value_t *pdp_temp,
251 rrd_value_t **seasonal_coef);
253 static int write_to_rras(
254 rrd_t *rrd,
255 rrd_file_t *rrd_file,
256 unsigned long *rra_step_cnt,
257 unsigned long rra_begin,
258 time_t current_time,
259 unsigned long *skip_update,
260 rrd_info_t ** pcdp_summary);
262 static int write_RRA_row(
263 rrd_file_t *rrd_file,
264 rrd_t *rrd,
265 unsigned long rra_idx,
266 unsigned short CDP_scratch_idx,
267 rrd_info_t ** pcdp_summary,
268 time_t rra_time);
270 static int smooth_all_rras(
271 rrd_t *rrd,
272 rrd_file_t *rrd_file,
273 unsigned long rra_begin);
275 #ifndef HAVE_MMAP
276 static int write_changes_to_disk(
277 rrd_t *rrd,
278 rrd_file_t *rrd_file,
279 int version);
280 #endif
282 /*
283 * normalize time as returned by gettimeofday. usec part must
284 * be always >= 0
285 */
286 static inline void normalize_time(
287 struct timeval *t)
288 {
289 if (t->tv_usec < 0) {
290 t->tv_sec--;
291 t->tv_usec += 1e6L;
292 }
293 }
295 /*
296 * Sets current_time and current_time_usec based on the current time.
297 * current_time_usec is set to 0 if the version number is 1 or 2.
298 */
299 static inline void initialize_time(
300 time_t *current_time,
301 unsigned long *current_time_usec,
302 int version)
303 {
304 struct timeval tmp_time; /* used for time conversion */
306 gettimeofday(&tmp_time, 0);
307 normalize_time(&tmp_time);
308 *current_time = tmp_time.tv_sec;
309 if (version >= 3) {
310 *current_time_usec = tmp_time.tv_usec;
311 } else {
312 *current_time_usec = 0;
313 }
314 }
316 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
318 rrd_info_t *rrd_update_v(
319 int argc,
320 char **argv)
321 {
322 char *tmplt = NULL;
323 rrd_info_t *result = NULL;
324 rrd_infoval_t rc;
325 char *opt_daemon = NULL;
326 struct option long_options[] = {
327 {"template", required_argument, 0, 't'},
328 {0, 0, 0, 0}
329 };
331 rc.u_int = -1;
332 optind = 0;
333 opterr = 0; /* initialize getopt */
335 while (1) {
336 int option_index = 0;
337 int opt;
339 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341 if (opt == EOF)
342 break;
344 switch (opt) {
345 case 't':
346 tmplt = optarg;
347 break;
349 case '?':
350 rrd_set_error("unknown option '%s'", argv[optind - 1]);
351 goto end_tag;
352 }
353 }
355 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
356 if (opt_daemon != NULL) {
357 rrd_set_error ("The \"%s\" environment variable is defined, "
358 "but \"%s\" cannot work with rrdcached. Either unset "
359 "the environment variable or use \"update\" instead.",
360 ENV_RRDCACHED_ADDRESS, argv[0]);
361 goto end_tag;
362 }
364 /* need at least 2 arguments: filename, data. */
365 if (argc - optind < 2) {
366 rrd_set_error("Not enough arguments");
367 goto end_tag;
368 }
369 rc.u_int = 0;
370 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
371 rc.u_int = _rrd_update(argv[optind], tmplt,
372 argc - optind - 1,
373 (const char **) (argv + optind + 1), result);
374 result->value.u_int = rc.u_int;
375 end_tag:
376 return result;
377 }
379 int rrd_update(
380 int argc,
381 char **argv)
382 {
383 struct option long_options[] = {
384 {"template", required_argument, 0, 't'},
385 {"daemon", required_argument, 0, 'd'},
386 {0, 0, 0, 0}
387 };
388 int option_index = 0;
389 int opt;
390 char *tmplt = NULL;
391 int rc = -1;
392 char *opt_daemon = NULL;
394 optind = 0;
395 opterr = 0; /* initialize getopt */
397 while (1) {
398 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
400 if (opt == EOF)
401 break;
403 switch (opt) {
404 case 't':
405 tmplt = strdup(optarg);
406 break;
408 case 'd':
409 if (opt_daemon != NULL)
410 free (opt_daemon);
411 opt_daemon = strdup (optarg);
412 if (opt_daemon == NULL)
413 {
414 rrd_set_error("strdup failed.");
415 goto out;
416 }
417 break;
419 case '?':
420 rrd_set_error("unknown option '%s'", argv[optind - 1]);
421 goto out;
422 }
423 }
425 /* need at least 2 arguments: filename, data. */
426 if (argc - optind < 2) {
427 rrd_set_error("Not enough arguments");
428 goto out;
429 }
431 { /* try to connect to rrdcached */
432 int status = rrdc_connect(opt_daemon);
433 if (status != 0) return status;
434 }
436 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
437 {
438 rrd_set_error("The caching daemon cannot be used together with "
439 "templates yet.");
440 goto out;
441 }
443 if (! rrdc_is_connected(opt_daemon))
444 {
445 rc = rrd_update_r(argv[optind], tmplt,
446 argc - optind - 1, (const char **) (argv + optind + 1));
447 }
448 else /* we are connected */
449 {
450 rc = rrdc_update (argv[optind], /* file */
451 argc - optind - 1, /* values_num */
452 (void *) (argv + optind + 1)); /* values */
453 if (rc > 0)
454 rrd_set_error("Failed sending the values to rrdcached: %s",
455 rrd_strerror (rc));
456 }
458 out:
459 if (tmplt != NULL)
460 {
461 free(tmplt);
462 tmplt = NULL;
463 }
464 if (opt_daemon != NULL)
465 {
466 free (opt_daemon);
467 opt_daemon = NULL;
468 }
469 return rc;
470 }
472 int rrd_update_r(
473 const char *filename,
474 const char *tmplt,
475 int argc,
476 const char **argv)
477 {
478 return _rrd_update(filename, tmplt, argc, argv, NULL);
479 }
481 int _rrd_update(
482 const char *filename,
483 const char *tmplt,
484 int argc,
485 const char **argv,
486 rrd_info_t * pcdp_summary)
487 {
489 int arg_i = 2;
491 unsigned long rra_begin; /* byte pointer to the rra
492 * area in the rrd file. this
493 * pointer never changes value */
494 rrd_value_t *pdp_new; /* prepare the incoming data to be added
495 * to the existing entry */
496 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
497 * to the cdp values */
499 long *tmpl_idx; /* index representing the settings
500 * transported by the tmplt index */
501 unsigned long tmpl_cnt = 2; /* time and data */
502 rrd_t rrd;
503 time_t current_time = 0;
504 unsigned long current_time_usec = 0; /* microseconds part of current time */
505 char **updvals;
506 int schedule_smooth = 0;
508 /* number of elapsed PDP steps since last update */
509 unsigned long *rra_step_cnt = NULL;
511 int version; /* rrd version */
512 rrd_file_t *rrd_file;
513 char *arg_copy; /* for processing the argv */
514 unsigned long *skip_update; /* RRAs to advance but not write */
516 /* need at least 1 arguments: data. */
517 if (argc < 1) {
518 rrd_set_error("Not enough arguments");
519 goto err_out;
520 }
522 rrd_init(&rrd);
523 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
524 goto err_free;
525 }
526 /* We are now at the beginning of the rra's */
527 rra_begin = rrd_file->header_len;
529 version = atoi(rrd.stat_head->version);
531 initialize_time(¤t_time, ¤t_time_usec, version);
533 /* get exclusive lock to whole file.
534 * lock gets removed when we close the file.
535 */
536 if (rrd_lock(rrd_file) != 0) {
537 rrd_set_error("could not lock RRD");
538 goto err_close;
539 }
541 if (allocate_data_structures(&rrd, &updvals,
542 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
543 &rra_step_cnt, &skip_update,
544 &pdp_new) == -1) {
545 goto err_close;
546 }
548 /* loop through the arguments. */
549 for (arg_i = 0; arg_i < argc; arg_i++) {
550 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
551 rrd_set_error("failed duplication argv entry");
552 break;
553 }
554 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
555 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
556 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
557 &pcdp_summary, version, skip_update,
558 &schedule_smooth) == -1) {
559 if (rrd_test_error()) { /* Should have error string always here */
560 char *save_error;
562 /* Prepend file name to error message */
563 if ((save_error = strdup(rrd_get_error())) != NULL) {
564 rrd_set_error("%s: %s", filename, save_error);
565 free(save_error);
566 }
567 }
568 free(arg_copy);
569 break;
570 }
571 free(arg_copy);
572 }
574 free(rra_step_cnt);
576 /* if we got here and if there is an error and if the file has not been
577 * written to, then close things up and return. */
578 if (rrd_test_error()) {
579 goto err_free_structures;
580 }
581 #ifndef HAVE_MMAP
582 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
583 goto err_free_structures;
584 }
585 #endif
587 /* calling the smoothing code here guarantees at most one smoothing
588 * operation per rrd_update call. Unfortunately, it is possible with bulk
589 * updates, or a long-delayed update for smoothing to occur off-schedule.
590 * This really isn't critical except during the burn-in cycles. */
591 if (schedule_smooth) {
592 smooth_all_rras(&rrd, rrd_file, rra_begin);
593 }
595 /* rrd_dontneed(rrd_file,&rrd); */
596 rrd_free(&rrd);
597 rrd_close(rrd_file);
599 free(pdp_new);
600 free(tmpl_idx);
601 free(pdp_temp);
602 free(skip_update);
603 free(updvals);
604 return 0;
606 err_free_structures:
607 free(pdp_new);
608 free(tmpl_idx);
609 free(pdp_temp);
610 free(skip_update);
611 free(updvals);
612 err_close:
613 rrd_close(rrd_file);
614 err_free:
615 rrd_free(&rrd);
616 err_out:
617 return -1;
618 }
620 /*
621 * Allocate some important arrays used, and initialize the template.
622 *
623 * When it returns, either all of the structures are allocated
624 * or none of them are.
625 *
626 * Returns 0 on success, -1 on error.
627 */
628 static int allocate_data_structures(
629 rrd_t *rrd,
630 char ***updvals,
631 rrd_value_t **pdp_temp,
632 const char *tmplt,
633 long **tmpl_idx,
634 unsigned long *tmpl_cnt,
635 unsigned long **rra_step_cnt,
636 unsigned long **skip_update,
637 rrd_value_t **pdp_new)
638 {
639 unsigned i, ii;
640 if ((*updvals = (char **) malloc(sizeof(char *)
641 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
642 rrd_set_error("allocating updvals pointer array.");
643 return -1;
644 }
645 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
646 * rrd->stat_head->ds_cnt)) ==
647 NULL) {
648 rrd_set_error("allocating pdp_temp.");
649 goto err_free_updvals;
650 }
651 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
652 *
653 rrd->stat_head->rra_cnt)) ==
654 NULL) {
655 rrd_set_error("allocating skip_update.");
656 goto err_free_pdp_temp;
657 }
658 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
659 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
660 rrd_set_error("allocating tmpl_idx.");
661 goto err_free_skip_update;
662 }
663 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
664 *
665 (rrd->stat_head->
666 rra_cnt))) == NULL) {
667 rrd_set_error("allocating rra_step_cnt.");
668 goto err_free_tmpl_idx;
669 }
671 /* initialize tmplt redirector */
672 /* default config example (assume DS 1 is a CDEF DS)
673 tmpl_idx[0] -> 0; (time)
674 tmpl_idx[1] -> 1; (DS 0)
675 tmpl_idx[2] -> 3; (DS 2)
676 tmpl_idx[3] -> 4; (DS 3) */
677 (*tmpl_idx)[0] = 0; /* time */
678 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
679 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
680 (*tmpl_idx)[ii++] = i;
681 }
682 *tmpl_cnt = ii;
684 if (tmplt != NULL) {
685 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
686 goto err_free_rra_step_cnt;
687 }
688 }
690 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
691 * rrd->stat_head->ds_cnt)) == NULL) {
692 rrd_set_error("allocating pdp_new.");
693 goto err_free_rra_step_cnt;
694 }
696 return 0;
698 err_free_rra_step_cnt:
699 free(*rra_step_cnt);
700 err_free_tmpl_idx:
701 free(*tmpl_idx);
702 err_free_skip_update:
703 free(*skip_update);
704 err_free_pdp_temp:
705 free(*pdp_temp);
706 err_free_updvals:
707 free(*updvals);
708 return -1;
709 }
711 /*
712 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
713 *
714 * Returns 0 on success.
715 */
716 static int parse_template(
717 rrd_t *rrd,
718 const char *tmplt,
719 unsigned long *tmpl_cnt,
720 long *tmpl_idx)
721 {
722 char *dsname, *tmplt_copy;
723 unsigned int tmpl_len, i;
724 int ret = 0;
726 *tmpl_cnt = 1; /* the first entry is the time */
728 /* we should work on a writeable copy here */
729 if ((tmplt_copy = strdup(tmplt)) == NULL) {
730 rrd_set_error("error copying tmplt '%s'", tmplt);
731 ret = -1;
732 goto out;
733 }
735 dsname = tmplt_copy;
736 tmpl_len = strlen(tmplt_copy);
737 for (i = 0; i <= tmpl_len; i++) {
738 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
739 tmplt_copy[i] = '\0';
740 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
741 rrd_set_error("tmplt contains more DS definitions than RRD");
742 ret = -1;
743 goto out_free_tmpl_copy;
744 }
745 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
746 rrd_set_error("unknown DS name '%s'", dsname);
747 ret = -1;
748 goto out_free_tmpl_copy;
749 }
750 /* go to the next entry on the tmplt_copy */
751 if (i < tmpl_len)
752 dsname = &tmplt_copy[i + 1];
753 }
754 }
755 out_free_tmpl_copy:
756 free(tmplt_copy);
757 out:
758 return ret;
759 }
761 /*
762 * Parse an update string, updates the primary data points (PDPs)
763 * and consolidated data points (CDPs), and writes changes to the RRAs.
764 *
765 * Returns 0 on success, -1 on error.
766 */
767 static int process_arg(
768 char *step_start,
769 rrd_t *rrd,
770 rrd_file_t *rrd_file,
771 unsigned long rra_begin,
772 time_t *current_time,
773 unsigned long *current_time_usec,
774 rrd_value_t *pdp_temp,
775 rrd_value_t *pdp_new,
776 unsigned long *rra_step_cnt,
777 char **updvals,
778 long *tmpl_idx,
779 unsigned long tmpl_cnt,
780 rrd_info_t ** pcdp_summary,
781 int version,
782 unsigned long *skip_update,
783 int *schedule_smooth)
784 {
785 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
787 /* a vector of future Holt-Winters seasonal coefs */
788 unsigned long elapsed_pdp_st;
790 double interval, pre_int, post_int; /* interval between this and
791 * the last run */
792 unsigned long proc_pdp_cnt;
794 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
795 current_time, current_time_usec, version) == -1) {
796 return -1;
797 }
799 interval = (double) (*current_time - rrd->live_head->last_up)
800 + (double) ((long) *current_time_usec -
801 (long) rrd->live_head->last_up_usec) / 1e6f;
803 /* process the data sources and update the pdp_prep
804 * area accordingly */
805 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
806 return -1;
807 }
809 elapsed_pdp_st = calculate_elapsed_steps(rrd,
810 *current_time,
811 *current_time_usec, interval,
812 &pre_int, &post_int,
813 &proc_pdp_cnt);
815 /* has a pdp_st moment occurred since the last run ? */
816 if (elapsed_pdp_st == 0) {
817 /* no we have not passed a pdp_st moment. therefore update is simple */
818 simple_update(rrd, interval, pdp_new);
819 } else {
820 /* an pdp_st has occurred. */
821 if (process_all_pdp_st(rrd, interval,
822 pre_int, post_int,
823 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
824 return -1;
825 }
826 if (update_all_cdp_prep(rrd, rra_step_cnt,
827 rra_begin, rrd_file,
828 elapsed_pdp_st,
829 proc_pdp_cnt,
830 &last_seasonal_coef,
831 &seasonal_coef,
832 pdp_temp,
833 skip_update, schedule_smooth) == -1) {
834 goto err_free_coefficients;
835 }
836 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
837 elapsed_pdp_st, pdp_temp,
838 &seasonal_coef) == -1) {
839 goto err_free_coefficients;
840 }
841 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
842 *current_time, skip_update,
843 pcdp_summary) == -1) {
844 goto err_free_coefficients;
845 }
846 } /* endif a pdp_st has occurred */
847 rrd->live_head->last_up = *current_time;
848 rrd->live_head->last_up_usec = *current_time_usec;
850 if (version < 3) {
851 *rrd->legacy_last_up = rrd->live_head->last_up;
852 }
853 free(seasonal_coef);
854 free(last_seasonal_coef);
855 return 0;
857 err_free_coefficients:
858 free(seasonal_coef);
859 free(last_seasonal_coef);
860 return -1;
861 }
863 /*
864 * Parse a DS string (time + colon-separated values), storing the
865 * results in current_time, current_time_usec, and updvals.
866 *
867 * Returns 0 on success, -1 on error.
868 */
869 static int parse_ds(
870 rrd_t *rrd,
871 char **updvals,
872 long *tmpl_idx,
873 char *input,
874 unsigned long tmpl_cnt,
875 time_t *current_time,
876 unsigned long *current_time_usec,
877 int version)
878 {
879 char *p;
880 unsigned long i;
881 char timesyntax;
883 updvals[0] = input;
884 /* initialize all ds input to unknown except the first one
885 which has always got to be set */
886 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
887 updvals[i] = "U";
889 /* separate all ds elements; first must be examined separately
890 due to alternate time syntax */
891 if ((p = strchr(input, '@')) != NULL) {
892 timesyntax = '@';
893 } else if ((p = strchr(input, ':')) != NULL) {
894 timesyntax = ':';
895 } else {
896 rrd_set_error("expected timestamp not found in data source from %s",
897 input);
898 return -1;
899 }
900 *p = '\0';
901 i = 1;
902 updvals[tmpl_idx[i++]] = p + 1;
903 while (*(++p)) {
904 if (*p == ':') {
905 *p = '\0';
906 if (i < tmpl_cnt) {
907 updvals[tmpl_idx[i++]] = p + 1;
908 }
909 }
910 }
912 if (i != tmpl_cnt) {
913 rrd_set_error("expected %lu data source readings (got %lu) from %s",
914 tmpl_cnt - 1, i, input);
915 return -1;
916 }
918 if (get_time_from_reading(rrd, timesyntax, updvals,
919 current_time, current_time_usec,
920 version) == -1) {
921 return -1;
922 }
923 return 0;
924 }
926 /*
927 * Parse the time in a DS string, store it in current_time and
928 * current_time_usec and verify that it's later than the last
929 * update for this DS.
930 *
931 * Returns 0 on success, -1 on error.
932 */
933 static int get_time_from_reading(
934 rrd_t *rrd,
935 char timesyntax,
936 char **updvals,
937 time_t *current_time,
938 unsigned long *current_time_usec,
939 int version)
940 {
941 double tmp;
942 char *parsetime_error = NULL;
943 char *old_locale;
944 rrd_time_value_t ds_tv;
945 struct timeval tmp_time; /* used for time conversion */
947 /* get the time from the reading ... handle N */
948 if (timesyntax == '@') { /* at-style */
949 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
950 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
951 return -1;
952 }
953 if (ds_tv.type == RELATIVE_TO_END_TIME ||
954 ds_tv.type == RELATIVE_TO_START_TIME) {
955 rrd_set_error("specifying time relative to the 'start' "
956 "or 'end' makes no sense here: %s", updvals[0]);
957 return -1;
958 }
959 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
960 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
961 } else if (strcmp(updvals[0], "N") == 0) {
962 gettimeofday(&tmp_time, 0);
963 normalize_time(&tmp_time);
964 *current_time = tmp_time.tv_sec;
965 *current_time_usec = tmp_time.tv_usec;
966 } else {
967 old_locale = setlocale(LC_NUMERIC, "C");
968 tmp = strtod(updvals[0], 0);
969 setlocale(LC_NUMERIC, old_locale);
970 *current_time = floor(tmp);
971 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
972 }
973 /* dont do any correction for old version RRDs */
974 if (version < 3)
975 *current_time_usec = 0;
977 if (*current_time < rrd->live_head->last_up ||
978 (*current_time == rrd->live_head->last_up &&
979 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
980 rrd_set_error("illegal attempt to update using time %ld when "
981 "last update time is %ld (minimum one second step)",
982 *current_time, rrd->live_head->last_up);
983 return -1;
984 }
985 return 0;
986 }
988 /*
989 * Update pdp_new by interpreting the updvals according to the DS type
990 * (COUNTER, GAUGE, etc.).
991 *
992 * Returns 0 on success, -1 on error.
993 */
994 static int update_pdp_prep(
995 rrd_t *rrd,
996 char **updvals,
997 rrd_value_t *pdp_new,
998 double interval)
999 {
1000 unsigned long ds_idx;
1001 int ii;
1002 char *endptr; /* used in the conversion */
1003 double rate;
1004 char *old_locale;
1005 enum dst_en dst_idx;
1007 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1008 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1010 /* make sure we do not build diffs with old last_ds values */
1011 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1012 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1013 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1014 }
1016 /* NOTE: DST_CDEF should never enter this if block, because
1017 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1018 * accidently specified a value for the DST_CDEF. To handle this case,
1019 * an extra check is required. */
1021 if ((updvals[ds_idx + 1][0] != 'U') &&
1022 (dst_idx != DST_CDEF) &&
1023 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1024 rate = DNAN;
1026 /* pdp_new contains rate * time ... eg the bytes transferred during
1027 * the interval. Doing it this way saves a lot of math operations
1028 */
1029 switch (dst_idx) {
1030 case DST_COUNTER:
1031 case DST_DERIVE:
1032 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1033 if ((updvals[ds_idx + 1][ii] < '0'
1034 || updvals[ds_idx + 1][ii] > '9')
1035 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1036 rrd_set_error("not a simple integer: '%s'",
1037 updvals[ds_idx + 1]);
1038 return -1;
1039 }
1040 }
1041 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1042 pdp_new[ds_idx] =
1043 rrd_diff(updvals[ds_idx + 1],
1044 rrd->pdp_prep[ds_idx].last_ds);
1045 if (dst_idx == DST_COUNTER) {
1046 /* simple overflow catcher. This will fail
1047 * terribly for non 32 or 64 bit counters
1048 * ... are there any others in SNMP land?
1049 */
1050 if (pdp_new[ds_idx] < (double) 0.0)
1051 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1052 if (pdp_new[ds_idx] < (double) 0.0)
1053 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1054 }
1055 rate = pdp_new[ds_idx] / interval;
1056 } else {
1057 pdp_new[ds_idx] = DNAN;
1058 }
1059 break;
1060 case DST_ABSOLUTE:
1061 old_locale = setlocale(LC_NUMERIC, "C");
1062 errno = 0;
1063 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1064 setlocale(LC_NUMERIC, old_locale);
1065 if (errno > 0) {
1066 rrd_set_error("converting '%s' to float: %s",
1067 updvals[ds_idx + 1], rrd_strerror(errno));
1068 return -1;
1069 };
1070 if (endptr[0] != '\0') {
1071 rrd_set_error
1072 ("conversion of '%s' to float not complete: tail '%s'",
1073 updvals[ds_idx + 1], endptr);
1074 return -1;
1075 }
1076 rate = pdp_new[ds_idx] / interval;
1077 break;
1078 case DST_GAUGE:
1079 errno = 0;
1080 old_locale = setlocale(LC_NUMERIC, "C");
1081 pdp_new[ds_idx] =
1082 strtod(updvals[ds_idx + 1], &endptr) * interval;
1083 setlocale(LC_NUMERIC, old_locale);
1084 if (errno) {
1085 rrd_set_error("converting '%s' to float: %s",
1086 updvals[ds_idx + 1], rrd_strerror(errno));
1087 return -1;
1088 };
1089 if (endptr[0] != '\0') {
1090 rrd_set_error
1091 ("conversion of '%s' to float not complete: tail '%s'",
1092 updvals[ds_idx + 1], endptr);
1093 return -1;
1094 }
1095 rate = pdp_new[ds_idx] / interval;
1096 break;
1097 default:
1098 rrd_set_error("rrd contains unknown DS type : '%s'",
1099 rrd->ds_def[ds_idx].dst);
1100 return -1;
1101 }
1102 /* break out of this for loop if the error string is set */
1103 if (rrd_test_error()) {
1104 return -1;
1105 }
1106 /* make sure pdp_temp is neither too large or too small
1107 * if any of these occur it becomes unknown ...
1108 * sorry folks ... */
1109 if (!isnan(rate) &&
1110 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1111 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1112 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1113 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1114 pdp_new[ds_idx] = DNAN;
1115 }
1116 } else {
1117 /* no news is news all the same */
1118 pdp_new[ds_idx] = DNAN;
1119 }
1122 /* make a copy of the command line argument for the next run */
1123 #ifdef DEBUG
1124 fprintf(stderr, "prep ds[%lu]\t"
1125 "last_arg '%s'\t"
1126 "this_arg '%s'\t"
1127 "pdp_new %10.2f\n",
1128 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1129 pdp_new[ds_idx]);
1130 #endif
1131 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1132 LAST_DS_LEN - 1);
1133 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1134 }
1135 return 0;
1136 }
1138 /*
1139 * How many PDP steps have elapsed since the last update? Returns the answer,
1140 * and stores the time between the last update and the last PDP in pre_time,
1141 * and the time between the last PDP and the current time in post_int.
1142 */
1143 static int calculate_elapsed_steps(
1144 rrd_t *rrd,
1145 unsigned long current_time,
1146 unsigned long current_time_usec,
1147 double interval,
1148 double *pre_int,
1149 double *post_int,
1150 unsigned long *proc_pdp_cnt)
1151 {
1152 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1153 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1154 * time */
1155 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1156 * when it was last updated */
1157 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1159 /* when was the current pdp started */
1160 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1161 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1163 /* when did the last pdp_st occur */
1164 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1165 occu_pdp_st = current_time - occu_pdp_age;
1167 if (occu_pdp_st > proc_pdp_st) {
1168 /* OK we passed the pdp_st moment */
1169 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1170 * occurred before the latest
1171 * pdp_st moment*/
1172 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1173 *post_int = occu_pdp_age; /* how much after it */
1174 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1175 } else {
1176 *pre_int = interval;
1177 *post_int = 0;
1178 }
1180 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1182 #ifdef DEBUG
1183 printf("proc_pdp_age %lu\t"
1184 "proc_pdp_st %lu\t"
1185 "occu_pfp_age %lu\t"
1186 "occu_pdp_st %lu\t"
1187 "int %lf\t"
1188 "pre_int %lf\t"
1189 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1190 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1191 #endif
1193 /* compute the number of elapsed pdp_st moments */
1194 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1195 }
1197 /*
1198 * Increment the PDP values by the values in pdp_new, or else initialize them.
1199 */
1200 static void simple_update(
1201 rrd_t *rrd,
1202 double interval,
1203 rrd_value_t *pdp_new)
1204 {
1205 int i;
1207 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1208 if (isnan(pdp_new[i])) {
1209 /* this is not really accurate if we use subsecond data arrival time
1210 should have thought of it when going subsecond resolution ...
1211 sorry next format change we will have it! */
1212 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1213 floor(interval);
1214 } else {
1215 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1216 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1217 } else {
1218 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1219 }
1220 }
1221 #ifdef DEBUG
1222 fprintf(stderr,
1223 "NO PDP ds[%i]\t"
1224 "value %10.2f\t"
1225 "unkn_sec %5lu\n",
1226 i,
1227 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1228 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1229 #endif
1230 }
1231 }
1233 /*
1234 * Call process_pdp_st for each DS.
1235 *
1236 * Returns 0 on success, -1 on error.
1237 */
1238 static int process_all_pdp_st(
1239 rrd_t *rrd,
1240 double interval,
1241 double pre_int,
1242 double post_int,
1243 unsigned long elapsed_pdp_st,
1244 rrd_value_t *pdp_new,
1245 rrd_value_t *pdp_temp)
1246 {
1247 unsigned long ds_idx;
1249 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1250 rate*seconds which occurred up to the last run.
1251 pdp_new[] contains rate*seconds from the latest run.
1252 pdp_temp[] will contain the rate for cdp */
1254 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1255 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1256 elapsed_pdp_st * rrd->stat_head->pdp_step,
1257 pdp_new, pdp_temp) == -1) {
1258 return -1;
1259 }
1260 #ifdef DEBUG
1261 fprintf(stderr, "PDP UPD ds[%lu]\t"
1262 "elapsed_pdp_st %lu\t"
1263 "pdp_temp %10.2f\t"
1264 "new_prep %10.2f\t"
1265 "new_unkn_sec %5lu\n",
1266 ds_idx,
1267 elapsed_pdp_st,
1268 pdp_temp[ds_idx],
1269 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1270 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1271 #endif
1272 }
1273 return 0;
1274 }
1276 /*
1277 * Process an update that occurs after one of the PDP moments.
1278 * Increments the PDP value, sets NAN if time greater than the
1279 * heartbeats have elapsed, processes CDEFs.
1280 *
1281 * Returns 0 on success, -1 on error.
1282 */
1283 static int process_pdp_st(
1284 rrd_t *rrd,
1285 unsigned long ds_idx,
1286 double interval,
1287 double pre_int,
1288 double post_int,
1289 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1290 rrd_value_t *pdp_new,
1291 rrd_value_t *pdp_temp)
1292 {
1293 int i;
1295 /* update pdp_prep to the current pdp_st. */
1296 double pre_unknown = 0.0;
1297 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1298 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1300 rpnstack_t rpnstack; /* used for COMPUTE DS */
1302 rpnstack_init(&rpnstack);
1305 if (isnan(pdp_new[ds_idx])) {
1306 /* a final bit of unknown to be added before calculation
1307 we use a temporary variable for this so that we
1308 don't have to turn integer lines before using the value */
1309 pre_unknown = pre_int;
1310 } else {
1311 if (isnan(scratch[PDP_val].u_val)) {
1312 scratch[PDP_val].u_val = 0;
1313 }
1314 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1315 }
1317 /* if too much of the pdp_prep is unknown we dump it */
1318 /* if the interval is larger thatn mrhb we get NAN */
1319 if ((interval > mrhb) ||
1320 (rrd->stat_head->pdp_step / 2.0 <
1321 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1322 pdp_temp[ds_idx] = DNAN;
1323 } else {
1324 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1325 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1326 pre_unknown);
1327 }
1329 /* process CDEF data sources; remember each CDEF DS can
1330 * only reference other DS with a lower index number */
1331 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1332 rpnp_t *rpnp;
1334 rpnp =
1335 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1336 /* substitute data values for OP_VARIABLE nodes */
1337 for (i = 0; rpnp[i].op != OP_END; i++) {
1338 if (rpnp[i].op == OP_VARIABLE) {
1339 rpnp[i].op = OP_NUMBER;
1340 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1341 }
1342 }
1343 /* run the rpn calculator */
1344 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1345 free(rpnp);
1346 rpnstack_free(&rpnstack);
1347 return -1;
1348 }
1349 }
1351 /* make pdp_prep ready for the next run */
1352 if (isnan(pdp_new[ds_idx])) {
1353 /* this is not realy accurate if we use subsecond data arival time
1354 should have thought of it when going subsecond resolution ...
1355 sorry next format change we will have it! */
1356 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1357 scratch[PDP_val].u_val = DNAN;
1358 } else {
1359 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1360 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1361 }
1362 rpnstack_free(&rpnstack);
1363 return 0;
1364 }
1366 /*
1367 * Iterate over all the RRAs for a given DS and:
1368 * 1. Decide whether to schedule a smooth later
1369 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1370 * 3. Update the CDP
1371 *
1372 * Returns 0 on success, -1 on error
1373 */
1374 static int update_all_cdp_prep(
1375 rrd_t *rrd,
1376 unsigned long *rra_step_cnt,
1377 unsigned long rra_begin,
1378 rrd_file_t *rrd_file,
1379 unsigned long elapsed_pdp_st,
1380 unsigned long proc_pdp_cnt,
1381 rrd_value_t **last_seasonal_coef,
1382 rrd_value_t **seasonal_coef,
1383 rrd_value_t *pdp_temp,
1384 unsigned long *skip_update,
1385 int *schedule_smooth)
1386 {
1387 unsigned long rra_idx;
1389 /* index into the CDP scratch array */
1390 enum cf_en current_cf;
1391 unsigned long rra_start;
1393 /* number of rows to be updated in an RRA for a data value. */
1394 unsigned long start_pdp_offset;
1396 rra_start = rra_begin;
1397 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1398 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1399 start_pdp_offset =
1400 rrd->rra_def[rra_idx].pdp_cnt -
1401 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1402 skip_update[rra_idx] = 0;
1403 if (start_pdp_offset <= elapsed_pdp_st) {
1404 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1405 rrd->rra_def[rra_idx].pdp_cnt + 1;
1406 } else {
1407 rra_step_cnt[rra_idx] = 0;
1408 }
1410 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1411 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1412 * so that they will be correct for the next observed value; note that for
1413 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1414 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1415 if (rra_step_cnt[rra_idx] > 1) {
1416 skip_update[rra_idx] = 1;
1417 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1418 elapsed_pdp_st, last_seasonal_coef);
1419 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1420 elapsed_pdp_st + 1, seasonal_coef);
1421 }
1422 /* periodically run a smoother for seasonal effects */
1423 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1424 #ifdef DEBUG
1425 fprintf(stderr,
1426 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1427 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1428 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1429 u_cnt);
1430 #endif
1431 *schedule_smooth = 1;
1432 }
1433 }
1434 if (rrd_test_error())
1435 return -1;
1437 if (update_cdp_prep
1438 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1439 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1440 current_cf) == -1) {
1441 return -1;
1442 }
1443 rra_start +=
1444 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1445 sizeof(rrd_value_t);
1446 }
1447 return 0;
1448 }
1450 /*
1451 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1452 */
1453 static int do_schedule_smooth(
1454 rrd_t *rrd,
1455 unsigned long rra_idx,
1456 unsigned long elapsed_pdp_st)
1457 {
1458 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1459 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1460 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1461 unsigned long seasonal_smooth_idx =
1462 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1463 unsigned long *init_seasonal =
1464 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1466 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1467 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1468 * really an RRA level, not a data source within RRA level parameter, but
1469 * the rra_def is read only for rrd_update (not flushed to disk). */
1470 if (*init_seasonal > BURNIN_CYCLES) {
1471 /* someone has no doubt invented a trick to deal with this wrap around,
1472 * but at least this code is clear. */
1473 if (seasonal_smooth_idx > cur_row) {
1474 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1475 * between PDP and CDP */
1476 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1477 }
1478 /* can't rely on negative numbers because we are working with
1479 * unsigned values */
1480 return (cur_row + elapsed_pdp_st >= row_cnt
1481 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1482 }
1483 /* mark off one of the burn-in cycles */
1484 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1485 }
1487 /*
1488 * For a given RRA, iterate over the data sources and call the appropriate
1489 * consolidation function.
1490 *
1491 * Returns 0 on success, -1 on error.
1492 */
1493 static int update_cdp_prep(
1494 rrd_t *rrd,
1495 unsigned long elapsed_pdp_st,
1496 unsigned long start_pdp_offset,
1497 unsigned long *rra_step_cnt,
1498 int rra_idx,
1499 rrd_value_t *pdp_temp,
1500 rrd_value_t *last_seasonal_coef,
1501 rrd_value_t *seasonal_coef,
1502 int current_cf)
1503 {
1504 unsigned long ds_idx, cdp_idx;
1506 /* update CDP_PREP areas */
1507 /* loop over data soures within each RRA */
1508 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1510 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1512 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1513 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1514 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1515 elapsed_pdp_st, start_pdp_offset,
1516 rrd->rra_def[rra_idx].pdp_cnt,
1517 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1518 rra_idx, ds_idx);
1519 } else {
1520 /* Nothing to consolidate if there's one PDP per CDP. However, if
1521 * we've missed some PDPs, let's update null counters etc. */
1522 if (elapsed_pdp_st > 2) {
1523 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1524 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1525 current_cf);
1526 }
1527 }
1529 if (rrd_test_error())
1530 return -1;
1531 } /* endif data sources loop */
1532 return 0;
1533 }
1535 /*
1536 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1537 * primary value, secondary value, and # of unknowns.
1538 */
1539 static void update_cdp(
1540 unival *scratch,
1541 int current_cf,
1542 rrd_value_t pdp_temp_val,
1543 unsigned long rra_step_cnt,
1544 unsigned long elapsed_pdp_st,
1545 unsigned long start_pdp_offset,
1546 unsigned long pdp_cnt,
1547 rrd_value_t xff,
1548 int i,
1549 int ii)
1550 {
1551 /* shorthand variables */
1552 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1553 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1554 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1555 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1557 if (rra_step_cnt) {
1558 /* If we are in this block, as least 1 CDP value will be written to
1559 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1560 * to be written, then the "fill in" value is the CDP_secondary_val
1561 * entry. */
1562 if (isnan(pdp_temp_val)) {
1563 *cdp_unkn_pdp_cnt += start_pdp_offset;
1564 *cdp_secondary_val = DNAN;
1565 } else {
1566 /* CDP_secondary value is the RRA "fill in" value for intermediary
1567 * CDP data entries. No matter the CF, the value is the same because
1568 * the average, max, min, and last of a list of identical values is
1569 * the same, namely, the value itself. */
1570 *cdp_secondary_val = pdp_temp_val;
1571 }
1573 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1574 *cdp_primary_val = DNAN;
1575 if (current_cf == CF_AVERAGE) {
1576 *cdp_val =
1577 initialize_average_carry_over(pdp_temp_val,
1578 elapsed_pdp_st,
1579 start_pdp_offset, pdp_cnt);
1580 } else {
1581 *cdp_val = pdp_temp_val;
1582 }
1583 } else {
1584 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1585 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1586 } /* endif meets xff value requirement for a valid value */
1587 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1588 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1589 if (isnan(pdp_temp_val))
1590 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1591 else
1592 *cdp_unkn_pdp_cnt = 0;
1593 } else { /* rra_step_cnt[i] == 0 */
1595 #ifdef DEBUG
1596 if (isnan(*cdp_val)) {
1597 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1598 i, ii);
1599 } else {
1600 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1601 i, ii, *cdp_val);
1602 }
1603 #endif
1604 if (isnan(pdp_temp_val)) {
1605 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1606 } else {
1607 *cdp_val =
1608 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1609 current_cf, i, ii);
1610 }
1611 }
1612 }
1614 /*
1615 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1616 * on the type of consolidation function.
1617 */
1618 static void initialize_cdp_val(
1619 unival *scratch,
1620 int current_cf,
1621 rrd_value_t pdp_temp_val,
1622 unsigned long elapsed_pdp_st,
1623 unsigned long start_pdp_offset,
1624 unsigned long pdp_cnt)
1625 {
1626 rrd_value_t cum_val, cur_val;
1628 switch (current_cf) {
1629 case CF_AVERAGE:
1630 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1631 cur_val = IFDNAN(pdp_temp_val, 0.0);
1632 scratch[CDP_primary_val].u_val =
1633 (cum_val + cur_val * start_pdp_offset) /
1634 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1635 scratch[CDP_val].u_val =
1636 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1637 start_pdp_offset, pdp_cnt);
1638 break;
1639 case CF_MAXIMUM:
1640 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1641 cur_val = IFDNAN(pdp_temp_val, -DINF);
1642 #if 0
1643 #ifdef DEBUG
1644 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1645 fprintf(stderr,
1646 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1647 i, ii);
1648 exit(-1);
1649 }
1650 #endif
1651 #endif
1652 if (cur_val > cum_val)
1653 scratch[CDP_primary_val].u_val = cur_val;
1654 else
1655 scratch[CDP_primary_val].u_val = cum_val;
1656 /* initialize carry over value */
1657 scratch[CDP_val].u_val = pdp_temp_val;
1658 break;
1659 case CF_MINIMUM:
1660 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1661 cur_val = IFDNAN(pdp_temp_val, DINF);
1662 #if 0
1663 #ifdef DEBUG
1664 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1665 fprintf(stderr,
1666 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1667 ii);
1668 exit(-1);
1669 }
1670 #endif
1671 #endif
1672 if (cur_val < cum_val)
1673 scratch[CDP_primary_val].u_val = cur_val;
1674 else
1675 scratch[CDP_primary_val].u_val = cum_val;
1676 /* initialize carry over value */
1677 scratch[CDP_val].u_val = pdp_temp_val;
1678 break;
1679 case CF_LAST:
1680 default:
1681 scratch[CDP_primary_val].u_val = pdp_temp_val;
1682 /* initialize carry over value */
1683 scratch[CDP_val].u_val = pdp_temp_val;
1684 break;
1685 }
1686 }
1688 /*
1689 * Update the consolidation function for Holt-Winters functions as
1690 * well as other functions that don't actually consolidate multiple
1691 * PDPs.
1692 */
1693 static void reset_cdp(
1694 rrd_t *rrd,
1695 unsigned long elapsed_pdp_st,
1696 rrd_value_t *pdp_temp,
1697 rrd_value_t *last_seasonal_coef,
1698 rrd_value_t *seasonal_coef,
1699 int rra_idx,
1700 int ds_idx,
1701 int cdp_idx,
1702 enum cf_en current_cf)
1703 {
1704 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1706 switch (current_cf) {
1707 case CF_AVERAGE:
1708 default:
1709 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1710 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1711 break;
1712 case CF_SEASONAL:
1713 case CF_DEVSEASONAL:
1714 /* need to update cached seasonal values, so they are consistent
1715 * with the bulk update */
1716 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1717 * CDP_last_deviation are the same. */
1718 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1719 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1720 break;
1721 case CF_HWPREDICT:
1722 case CF_MHWPREDICT:
1723 /* need to update the null_count and last_null_count.
1724 * even do this for non-DNAN pdp_temp because the
1725 * algorithm is not learning from batch updates. */
1726 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1727 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1728 /* fall through */
1729 case CF_DEVPREDICT:
1730 scratch[CDP_primary_val].u_val = DNAN;
1731 scratch[CDP_secondary_val].u_val = DNAN;
1732 break;
1733 case CF_FAILURES:
1734 /* do not count missed bulk values as failures */
1735 scratch[CDP_primary_val].u_val = 0;
1736 scratch[CDP_secondary_val].u_val = 0;
1737 /* need to reset violations buffer.
1738 * could do this more carefully, but for now, just
1739 * assume a bulk update wipes away all violations. */
1740 erase_violations(rrd, cdp_idx, rra_idx);
1741 break;
1742 }
1743 }
1745 static rrd_value_t initialize_average_carry_over(
1746 rrd_value_t pdp_temp_val,
1747 unsigned long elapsed_pdp_st,
1748 unsigned long start_pdp_offset,
1749 unsigned long pdp_cnt)
1750 {
1751 /* initialize carry over value */
1752 if (isnan(pdp_temp_val)) {
1753 return DNAN;
1754 }
1755 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1756 }
1758 /*
1759 * Update or initialize a CDP value based on the consolidation
1760 * function.
1761 *
1762 * Returns the new value.
1763 */
1764 static rrd_value_t calculate_cdp_val(
1765 rrd_value_t cdp_val,
1766 rrd_value_t pdp_temp_val,
1767 unsigned long elapsed_pdp_st,
1768 int current_cf,
1769 #ifdef DEBUG
1770 int i,
1771 int ii
1772 #else
1773 int UNUSED(i),
1774 int UNUSED(ii)
1775 #endif
1776 )
1777 {
1778 if (isnan(cdp_val)) {
1779 if (current_cf == CF_AVERAGE) {
1780 pdp_temp_val *= elapsed_pdp_st;
1781 }
1782 #ifdef DEBUG
1783 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1784 i, ii, pdp_temp_val);
1785 #endif
1786 return pdp_temp_val;
1787 }
1788 if (current_cf == CF_AVERAGE)
1789 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1790 if (current_cf == CF_MINIMUM)
1791 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1792 if (current_cf == CF_MAXIMUM)
1793 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1795 return pdp_temp_val;
1796 }
1798 /*
1799 * For each RRA, update the seasonal values and then call update_aberrant_CF
1800 * for each data source.
1801 *
1802 * Return 0 on success, -1 on error.
1803 */
1804 static int update_aberrant_cdps(
1805 rrd_t *rrd,
1806 rrd_file_t *rrd_file,
1807 unsigned long rra_begin,
1808 unsigned long elapsed_pdp_st,
1809 rrd_value_t *pdp_temp,
1810 rrd_value_t **seasonal_coef)
1811 {
1812 unsigned long rra_idx, ds_idx, j;
1814 /* number of PDP steps since the last update that
1815 * are assigned to the first CDP to be generated
1816 * since the last update. */
1817 unsigned short scratch_idx;
1818 unsigned long rra_start;
1819 enum cf_en current_cf;
1821 /* this loop is only entered if elapsed_pdp_st < 3 */
1822 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1823 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1824 rra_start = rra_begin;
1825 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1826 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1827 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1828 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1829 if (scratch_idx == CDP_primary_val) {
1830 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1831 elapsed_pdp_st + 1, seasonal_coef);
1832 } else {
1833 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1834 elapsed_pdp_st + 2, seasonal_coef);
1835 }
1836 }
1837 if (rrd_test_error())
1838 return -1;
1839 /* loop over data soures within each RRA */
1840 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1841 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1842 rra_idx * (rrd->stat_head->ds_cnt) +
1843 ds_idx, rra_idx, ds_idx, scratch_idx,
1844 *seasonal_coef);
1845 }
1846 }
1847 rra_start += rrd->rra_def[rra_idx].row_cnt
1848 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1849 }
1850 }
1851 return 0;
1852 }
1854 /*
1855 * Move sequentially through the file, writing one RRA at a time. Note this
1856 * architecture divorces the computation of CDP with flushing updated RRA
1857 * entries to disk.
1858 *
1859 * Return 0 on success, -1 on error.
1860 */
1861 static int write_to_rras(
1862 rrd_t *rrd,
1863 rrd_file_t *rrd_file,
1864 unsigned long *rra_step_cnt,
1865 unsigned long rra_begin,
1866 time_t current_time,
1867 unsigned long *skip_update,
1868 rrd_info_t ** pcdp_summary)
1869 {
1870 unsigned long rra_idx;
1871 unsigned long rra_start;
1872 time_t rra_time = 0; /* time of update for a RRA */
1874 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1876 /* Ready to write to disk */
1877 rra_start = rra_begin;
1879 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1880 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1881 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1883 /* for cdp_prep */
1884 unsigned short scratch_idx;
1885 unsigned long step_subtract;
1887 for (scratch_idx = CDP_primary_val,
1888 step_subtract = 1;
1889 rra_step_cnt[rra_idx] > 0;
1890 rra_step_cnt[rra_idx]--,
1891 scratch_idx = CDP_secondary_val,
1892 step_subtract = 2) {
1894 off_t rra_pos_new;
1895 #ifdef DEBUG
1896 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1897 #endif
1898 /* increment, with wrap-around */
1899 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1900 rra_ptr->cur_row = 0;
1902 /* we know what our position should be */
1903 rra_pos_new = rra_start
1904 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1906 /* re-seek if the position is wrong or we wrapped around */
1907 if (rra_pos_new != rrd_file->pos) {
1908 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1909 rrd_set_error("seek error in rrd");
1910 return -1;
1911 }
1912 }
1913 #ifdef DEBUG
1914 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1915 #endif
1917 if (skip_update[rra_idx])
1918 continue;
1920 if (*pcdp_summary != NULL) {
1921 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1923 rra_time = (current_time - current_time % step_time)
1924 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1925 }
1927 if (write_RRA_row
1928 (rrd_file, rrd, rra_idx, scratch_idx,
1929 pcdp_summary, rra_time) == -1)
1930 return -1;
1932 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1933 }
1935 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1936 } /* RRA LOOP */
1938 return 0;
1939 }
1941 /*
1942 * Write out one row of values (one value per DS) to the archive.
1943 *
1944 * Returns 0 on success, -1 on error.
1945 */
1946 static int write_RRA_row(
1947 rrd_file_t *rrd_file,
1948 rrd_t *rrd,
1949 unsigned long rra_idx,
1950 unsigned short CDP_scratch_idx,
1951 rrd_info_t ** pcdp_summary,
1952 time_t rra_time)
1953 {
1954 unsigned long ds_idx, cdp_idx;
1955 rrd_infoval_t iv;
1957 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1958 /* compute the cdp index */
1959 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1960 #ifdef DEBUG
1961 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1962 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1963 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1964 #endif
1965 if (*pcdp_summary != NULL) {
1966 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1967 /* append info to the return hash */
1968 *pcdp_summary = rrd_info_push(*pcdp_summary,
1969 sprintf_alloc
1970 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1971 rrd->rra_def[rra_idx].cf_nam,
1972 rrd->rra_def[rra_idx].pdp_cnt,
1973 rrd->ds_def[ds_idx].ds_nam),
1974 RD_I_VAL, iv);
1975 }
1976 if (rrd_write(rrd_file,
1977 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1978 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1979 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1980 return -1;
1981 }
1982 }
1983 return 0;
1984 }
1986 /*
1987 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1988 *
1989 * Returns 0 on success, -1 otherwise
1990 */
1991 static int smooth_all_rras(
1992 rrd_t *rrd,
1993 rrd_file_t *rrd_file,
1994 unsigned long rra_begin)
1995 {
1996 unsigned long rra_start = rra_begin;
1997 unsigned long rra_idx;
1999 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2000 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2001 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2002 #ifdef DEBUG
2003 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2004 #endif
2005 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2006 if (rrd_test_error())
2007 return -1;
2008 }
2009 rra_start += rrd->rra_def[rra_idx].row_cnt
2010 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2011 }
2012 return 0;
2013 }
2015 #ifndef HAVE_MMAP
2016 /*
2017 * Flush changes to disk (unless we're using mmap)
2018 *
2019 * Returns 0 on success, -1 otherwise
2020 */
2021 static int write_changes_to_disk(
2022 rrd_t *rrd,
2023 rrd_file_t *rrd_file,
2024 int version)
2025 {
2026 /* we just need to write back the live header portion now */
2027 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2028 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2029 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2030 SEEK_SET) != 0) {
2031 rrd_set_error("seek rrd for live header writeback");
2032 return -1;
2033 }
2034 if (version >= 3) {
2035 if (rrd_write(rrd_file, rrd->live_head,
2036 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2037 rrd_set_error("rrd_write live_head to rrd");
2038 return -1;
2039 }
2040 } else {
2041 if (rrd_write(rrd_file, rrd->legacy_last_up,
2042 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2043 rrd_set_error("rrd_write live_head to rrd");
2044 return -1;
2045 }
2046 }
2049 if (rrd_write(rrd_file, rrd->pdp_prep,
2050 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2051 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2052 rrd_set_error("rrd_write pdp_prep to rrd");
2053 return -1;
2054 }
2056 if (rrd_write(rrd_file, rrd->cdp_prep,
2057 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2058 rrd->stat_head->ds_cnt)
2059 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2060 rrd->stat_head->ds_cnt)) {
2062 rrd_set_error("rrd_write cdp_prep to rrd");
2063 return -1;
2064 }
2066 if (rrd_write(rrd_file, rrd->rra_ptr,
2067 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2068 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2069 rrd_set_error("rrd_write rra_ptr to rrd");
2070 return -1;
2071 }
2072 return 0;
2073 }
2074 #endif