1 /*****************************************************************************
2 * RRDtool 1.4.3 Copyright by Tobi Oetiker, 1997-2010
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int rrd_update_r(
67 const char *filename,
68 const char *tmplt,
69 int argc,
70 const char **argv);
71 int _rrd_update(
72 const char *filename,
73 const char *tmplt,
74 int argc,
75 const char **argv,
76 rrd_info_t *);
78 static int allocate_data_structures(
79 rrd_t *rrd,
80 char ***updvals,
81 rrd_value_t **pdp_temp,
82 const char *tmplt,
83 long **tmpl_idx,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
90 rrd_t *rrd,
91 const char *tmplt,
92 unsigned long *tmpl_cnt,
93 long *tmpl_idx);
95 static int process_arg(
96 char *step_start,
97 rrd_t *rrd,
98 rrd_file_t *rrd_file,
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
105 char **updvals,
106 long *tmpl_idx,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
109 int version,
110 unsigned long *skip_update,
111 int *schedule_smooth);
113 static int parse_ds(
114 rrd_t *rrd,
115 char **updvals,
116 long *tmpl_idx,
117 char *input,
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
121 int version);
123 static int get_time_from_reading(
124 rrd_t *rrd,
125 char timesyntax,
126 char **updvals,
127 time_t *current_time,
128 unsigned long *current_time_usec,
129 int version);
131 static int update_pdp_prep(
132 rrd_t *rrd,
133 char **updvals,
134 rrd_value_t *pdp_new,
135 double interval);
137 static int calculate_elapsed_steps(
138 rrd_t *rrd,
139 unsigned long current_time,
140 unsigned long current_time_usec,
141 double interval,
142 double *pre_int,
143 double *post_int,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
147 rrd_t *rrd,
148 double interval,
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152 rrd_t *rrd,
153 double interval,
154 double pre_int,
155 double post_int,
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161 rrd_t *rrd,
162 unsigned long ds_idx,
163 double interval,
164 double pre_int,
165 double post_int,
166 long diff_pdp_st,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171 rrd_t *rrd,
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
219 rrd_t *rrd,
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
224 int rra_idx,
225 int ds_idx,
226 int cdp_idx,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230 rrd_value_t pdp_temp_val,
231 int current_cf,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253 rrd_t *rrd,
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
257 time_t current_time,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
263 rrd_t *rrd,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
267 time_t rra_time);
269 static int smooth_all_rras(
270 rrd_t *rrd,
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276 rrd_t *rrd,
277 rrd_file_t *rrd_file,
278 int version);
279 #endif
281 /*
282 * normalize time as returned by gettimeofday. usec part must
283 * be always >= 0
284 */
285 static void normalize_time(
286 struct timeval *t)
287 {
288 if (t->tv_usec < 0) {
289 t->tv_sec--;
290 t->tv_usec += 1e6L;
291 }
292 }
294 /*
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
297 */
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
301 int version)
302 {
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
308 if (version >= 3) {
309 *current_time_usec = tmp_time.tv_usec;
310 } else {
311 *current_time_usec = 0;
312 }
313 }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318 int argc,
319 char **argv)
320 {
321 char *tmplt = NULL;
322 rrd_info_t *result = NULL;
323 rrd_infoval_t rc;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
327 {0, 0, 0, 0}
328 };
330 rc.u_int = -1;
331 optind = 0;
332 opterr = 0; /* initialize getopt */
334 while (1) {
335 int option_index = 0;
336 int opt;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340 if (opt == EOF)
341 break;
343 switch (opt) {
344 case 't':
345 tmplt = optarg;
346 break;
348 case '?':
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
350 goto end_tag;
351 }
352 }
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
360 goto end_tag;
361 }
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
366 goto end_tag;
367 }
368 rc.u_int = 0;
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
371 argc - optind - 1,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
374 end_tag:
375 return result;
376 }
378 int rrd_update(
379 int argc,
380 char **argv)
381 {
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
385 {0, 0, 0, 0}
386 };
387 int option_index = 0;
388 int opt;
389 char *tmplt = NULL;
390 int rc = -1;
391 char *opt_daemon = NULL;
393 optind = 0;
394 opterr = 0; /* initialize getopt */
396 while (1) {
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399 if (opt == EOF)
400 break;
402 switch (opt) {
403 case 't':
404 tmplt = strdup(optarg);
405 break;
407 case 'd':
408 if (opt_daemon != NULL)
409 free (opt_daemon);
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
412 {
413 rrd_set_error("strdup failed.");
414 goto out;
415 }
416 break;
418 case '?':
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 goto out;
421 }
422 }
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
427 goto out;
428 }
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) return status;
433 }
435 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
436 {
437 rrd_set_error("The caching daemon cannot be used together with "
438 "templates yet.");
439 goto out;
440 }
442 if (! rrdc_is_connected(opt_daemon))
443 {
444 rc = rrd_update_r(argv[optind], tmplt,
445 argc - optind - 1, (const char **) (argv + optind + 1));
446 }
447 else /* we are connected */
448 {
449 rc = rrdc_update (argv[optind], /* file */
450 argc - optind - 1, /* values_num */
451 (const char *const *) (argv + optind + 1)); /* values */
452 if (rc > 0)
453 rrd_set_error("Failed sending the values to rrdcached: %s",
454 rrd_strerror (rc));
455 }
457 out:
458 if (tmplt != NULL)
459 {
460 free(tmplt);
461 tmplt = NULL;
462 }
463 if (opt_daemon != NULL)
464 {
465 free (opt_daemon);
466 opt_daemon = NULL;
467 }
468 return rc;
469 }
471 int rrd_update_r(
472 const char *filename,
473 const char *tmplt,
474 int argc,
475 const char **argv)
476 {
477 return _rrd_update(filename, tmplt, argc, argv, NULL);
478 }
480 int rrd_update_v_r(
481 const char *filename,
482 const char *tmplt,
483 int argc,
484 const char **argv,
485 rrd_info_t * pcdp_summary)
486 {
487 return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
488 }
490 int _rrd_update(
491 const char *filename,
492 const char *tmplt,
493 int argc,
494 const char **argv,
495 rrd_info_t * pcdp_summary)
496 {
498 int arg_i = 2;
500 unsigned long rra_begin; /* byte pointer to the rra
501 * area in the rrd file. this
502 * pointer never changes value */
503 rrd_value_t *pdp_new; /* prepare the incoming data to be added
504 * to the existing entry */
505 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
506 * to the cdp values */
508 long *tmpl_idx; /* index representing the settings
509 * transported by the tmplt index */
510 unsigned long tmpl_cnt = 2; /* time and data */
511 rrd_t rrd;
512 time_t current_time = 0;
513 unsigned long current_time_usec = 0; /* microseconds part of current time */
514 char **updvals;
515 int schedule_smooth = 0;
517 /* number of elapsed PDP steps since last update */
518 unsigned long *rra_step_cnt = NULL;
520 int version; /* rrd version */
521 rrd_file_t *rrd_file;
522 char *arg_copy; /* for processing the argv */
523 unsigned long *skip_update; /* RRAs to advance but not write */
525 /* need at least 1 arguments: data. */
526 if (argc < 1) {
527 rrd_set_error("Not enough arguments");
528 goto err_out;
529 }
531 rrd_init(&rrd);
532 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
533 goto err_free;
534 }
535 /* We are now at the beginning of the rra's */
536 rra_begin = rrd_file->header_len;
538 version = atoi(rrd.stat_head->version);
540 initialize_time(¤t_time, ¤t_time_usec, version);
542 /* get exclusive lock to whole file.
543 * lock gets removed when we close the file.
544 */
545 if (rrd_lock(rrd_file) != 0) {
546 rrd_set_error("could not lock RRD");
547 goto err_close;
548 }
550 if (allocate_data_structures(&rrd, &updvals,
551 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552 &rra_step_cnt, &skip_update,
553 &pdp_new) == -1) {
554 goto err_close;
555 }
557 /* loop through the arguments. */
558 for (arg_i = 0; arg_i < argc; arg_i++) {
559 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560 rrd_set_error("failed duplication argv entry");
561 break;
562 }
563 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
564 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
565 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566 &pcdp_summary, version, skip_update,
567 &schedule_smooth) == -1) {
568 if (rrd_test_error()) { /* Should have error string always here */
569 char *save_error;
571 /* Prepend file name to error message */
572 if ((save_error = strdup(rrd_get_error())) != NULL) {
573 rrd_set_error("%s: %s", filename, save_error);
574 free(save_error);
575 }
576 }
577 free(arg_copy);
578 break;
579 }
580 free(arg_copy);
581 }
583 free(rra_step_cnt);
585 /* if we got here and if there is an error and if the file has not been
586 * written to, then close things up and return. */
587 if (rrd_test_error()) {
588 goto err_free_structures;
589 }
590 #ifndef HAVE_MMAP
591 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
592 goto err_free_structures;
593 }
594 #endif
596 /* calling the smoothing code here guarantees at most one smoothing
597 * operation per rrd_update call. Unfortunately, it is possible with bulk
598 * updates, or a long-delayed update for smoothing to occur off-schedule.
599 * This really isn't critical except during the burn-in cycles. */
600 if (schedule_smooth) {
601 smooth_all_rras(&rrd, rrd_file, rra_begin);
602 }
604 /* rrd_dontneed(rrd_file,&rrd); */
605 rrd_free(&rrd);
606 rrd_close(rrd_file);
608 free(pdp_new);
609 free(tmpl_idx);
610 free(pdp_temp);
611 free(skip_update);
612 free(updvals);
613 return 0;
615 err_free_structures:
616 free(pdp_new);
617 free(tmpl_idx);
618 free(pdp_temp);
619 free(skip_update);
620 free(updvals);
621 err_close:
622 rrd_close(rrd_file);
623 err_free:
624 rrd_free(&rrd);
625 err_out:
626 return -1;
627 }
629 /*
630 * Allocate some important arrays used, and initialize the template.
631 *
632 * When it returns, either all of the structures are allocated
633 * or none of them are.
634 *
635 * Returns 0 on success, -1 on error.
636 */
637 static int allocate_data_structures(
638 rrd_t *rrd,
639 char ***updvals,
640 rrd_value_t **pdp_temp,
641 const char *tmplt,
642 long **tmpl_idx,
643 unsigned long *tmpl_cnt,
644 unsigned long **rra_step_cnt,
645 unsigned long **skip_update,
646 rrd_value_t **pdp_new)
647 {
648 unsigned i, ii;
649 if ((*updvals = (char **) malloc(sizeof(char *)
650 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
651 rrd_set_error("allocating updvals pointer array.");
652 return -1;
653 }
654 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
655 * rrd->stat_head->ds_cnt)) ==
656 NULL) {
657 rrd_set_error("allocating pdp_temp.");
658 goto err_free_updvals;
659 }
660 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
661 *
662 rrd->stat_head->rra_cnt)) ==
663 NULL) {
664 rrd_set_error("allocating skip_update.");
665 goto err_free_pdp_temp;
666 }
667 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
668 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
669 rrd_set_error("allocating tmpl_idx.");
670 goto err_free_skip_update;
671 }
672 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
673 *
674 (rrd->stat_head->
675 rra_cnt))) == NULL) {
676 rrd_set_error("allocating rra_step_cnt.");
677 goto err_free_tmpl_idx;
678 }
680 /* initialize tmplt redirector */
681 /* default config example (assume DS 1 is a CDEF DS)
682 tmpl_idx[0] -> 0; (time)
683 tmpl_idx[1] -> 1; (DS 0)
684 tmpl_idx[2] -> 3; (DS 2)
685 tmpl_idx[3] -> 4; (DS 3) */
686 (*tmpl_idx)[0] = 0; /* time */
687 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
688 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
689 (*tmpl_idx)[ii++] = i;
690 }
691 *tmpl_cnt = ii;
693 if (tmplt != NULL) {
694 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
695 goto err_free_rra_step_cnt;
696 }
697 }
699 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700 * rrd->stat_head->ds_cnt)) == NULL) {
701 rrd_set_error("allocating pdp_new.");
702 goto err_free_rra_step_cnt;
703 }
705 return 0;
707 err_free_rra_step_cnt:
708 free(*rra_step_cnt);
709 err_free_tmpl_idx:
710 free(*tmpl_idx);
711 err_free_skip_update:
712 free(*skip_update);
713 err_free_pdp_temp:
714 free(*pdp_temp);
715 err_free_updvals:
716 free(*updvals);
717 return -1;
718 }
720 /*
721 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
722 *
723 * Returns 0 on success.
724 */
725 static int parse_template(
726 rrd_t *rrd,
727 const char *tmplt,
728 unsigned long *tmpl_cnt,
729 long *tmpl_idx)
730 {
731 char *dsname, *tmplt_copy;
732 unsigned int tmpl_len, i;
733 int ret = 0;
735 *tmpl_cnt = 1; /* the first entry is the time */
737 /* we should work on a writeable copy here */
738 if ((tmplt_copy = strdup(tmplt)) == NULL) {
739 rrd_set_error("error copying tmplt '%s'", tmplt);
740 ret = -1;
741 goto out;
742 }
744 dsname = tmplt_copy;
745 tmpl_len = strlen(tmplt_copy);
746 for (i = 0; i <= tmpl_len; i++) {
747 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
748 tmplt_copy[i] = '\0';
749 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
750 rrd_set_error("tmplt contains more DS definitions than RRD");
751 ret = -1;
752 goto out_free_tmpl_copy;
753 }
754 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
755 rrd_set_error("unknown DS name '%s'", dsname);
756 ret = -1;
757 goto out_free_tmpl_copy;
758 }
759 /* go to the next entry on the tmplt_copy */
760 if (i < tmpl_len)
761 dsname = &tmplt_copy[i + 1];
762 }
763 }
764 out_free_tmpl_copy:
765 free(tmplt_copy);
766 out:
767 return ret;
768 }
770 /*
771 * Parse an update string, updates the primary data points (PDPs)
772 * and consolidated data points (CDPs), and writes changes to the RRAs.
773 *
774 * Returns 0 on success, -1 on error.
775 */
776 static int process_arg(
777 char *step_start,
778 rrd_t *rrd,
779 rrd_file_t *rrd_file,
780 unsigned long rra_begin,
781 time_t *current_time,
782 unsigned long *current_time_usec,
783 rrd_value_t *pdp_temp,
784 rrd_value_t *pdp_new,
785 unsigned long *rra_step_cnt,
786 char **updvals,
787 long *tmpl_idx,
788 unsigned long tmpl_cnt,
789 rrd_info_t ** pcdp_summary,
790 int version,
791 unsigned long *skip_update,
792 int *schedule_smooth)
793 {
794 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
796 /* a vector of future Holt-Winters seasonal coefs */
797 unsigned long elapsed_pdp_st;
799 double interval, pre_int, post_int; /* interval between this and
800 * the last run */
801 unsigned long proc_pdp_cnt;
803 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
804 current_time, current_time_usec, version) == -1) {
805 return -1;
806 }
808 interval = (double) (*current_time - rrd->live_head->last_up)
809 + (double) ((long) *current_time_usec -
810 (long) rrd->live_head->last_up_usec) / 1e6f;
812 /* process the data sources and update the pdp_prep
813 * area accordingly */
814 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
815 return -1;
816 }
818 elapsed_pdp_st = calculate_elapsed_steps(rrd,
819 *current_time,
820 *current_time_usec, interval,
821 &pre_int, &post_int,
822 &proc_pdp_cnt);
824 /* has a pdp_st moment occurred since the last run ? */
825 if (elapsed_pdp_st == 0) {
826 /* no we have not passed a pdp_st moment. therefore update is simple */
827 simple_update(rrd, interval, pdp_new);
828 } else {
829 /* an pdp_st has occurred. */
830 if (process_all_pdp_st(rrd, interval,
831 pre_int, post_int,
832 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
833 return -1;
834 }
835 if (update_all_cdp_prep(rrd, rra_step_cnt,
836 rra_begin, rrd_file,
837 elapsed_pdp_st,
838 proc_pdp_cnt,
839 &last_seasonal_coef,
840 &seasonal_coef,
841 pdp_temp,
842 skip_update, schedule_smooth) == -1) {
843 goto err_free_coefficients;
844 }
845 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
846 elapsed_pdp_st, pdp_temp,
847 &seasonal_coef) == -1) {
848 goto err_free_coefficients;
849 }
850 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
851 *current_time, skip_update,
852 pcdp_summary) == -1) {
853 goto err_free_coefficients;
854 }
855 } /* endif a pdp_st has occurred */
856 rrd->live_head->last_up = *current_time;
857 rrd->live_head->last_up_usec = *current_time_usec;
859 if (version < 3) {
860 *rrd->legacy_last_up = rrd->live_head->last_up;
861 }
862 free(seasonal_coef);
863 free(last_seasonal_coef);
864 return 0;
866 err_free_coefficients:
867 free(seasonal_coef);
868 free(last_seasonal_coef);
869 return -1;
870 }
872 /*
873 * Parse a DS string (time + colon-separated values), storing the
874 * results in current_time, current_time_usec, and updvals.
875 *
876 * Returns 0 on success, -1 on error.
877 */
878 static int parse_ds(
879 rrd_t *rrd,
880 char **updvals,
881 long *tmpl_idx,
882 char *input,
883 unsigned long tmpl_cnt,
884 time_t *current_time,
885 unsigned long *current_time_usec,
886 int version)
887 {
888 char *p;
889 unsigned long i;
890 char timesyntax;
892 updvals[0] = input;
893 /* initialize all ds input to unknown except the first one
894 which has always got to be set */
895 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
896 updvals[i] = "U";
898 /* separate all ds elements; first must be examined separately
899 due to alternate time syntax */
900 if ((p = strchr(input, '@')) != NULL) {
901 timesyntax = '@';
902 } else if ((p = strchr(input, ':')) != NULL) {
903 timesyntax = ':';
904 } else {
905 rrd_set_error("expected timestamp not found in data source from %s",
906 input);
907 return -1;
908 }
909 *p = '\0';
910 i = 1;
911 updvals[tmpl_idx[i++]] = p + 1;
912 while (*(++p)) {
913 if (*p == ':') {
914 *p = '\0';
915 if (i < tmpl_cnt) {
916 updvals[tmpl_idx[i++]] = p + 1;
917 }
918 else {
919 rrd_set_error("found extra data on update argument: %s",p+1);
920 return -1;
921 }
922 }
923 }
925 if (i != tmpl_cnt) {
926 rrd_set_error("expected %lu data source readings (got %lu) from %s",
927 tmpl_cnt - 1, i - 1, input);
928 return -1;
929 }
931 if (get_time_from_reading(rrd, timesyntax, updvals,
932 current_time, current_time_usec,
933 version) == -1) {
934 return -1;
935 }
936 return 0;
937 }
939 /*
940 * Parse the time in a DS string, store it in current_time and
941 * current_time_usec and verify that it's later than the last
942 * update for this DS.
943 *
944 * Returns 0 on success, -1 on error.
945 */
946 static int get_time_from_reading(
947 rrd_t *rrd,
948 char timesyntax,
949 char **updvals,
950 time_t *current_time,
951 unsigned long *current_time_usec,
952 int version)
953 {
954 double tmp;
955 char *parsetime_error = NULL;
956 char *old_locale;
957 rrd_time_value_t ds_tv;
958 struct timeval tmp_time; /* used for time conversion */
960 /* get the time from the reading ... handle N */
961 if (timesyntax == '@') { /* at-style */
962 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
963 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
964 return -1;
965 }
966 if (ds_tv.type == RELATIVE_TO_END_TIME ||
967 ds_tv.type == RELATIVE_TO_START_TIME) {
968 rrd_set_error("specifying time relative to the 'start' "
969 "or 'end' makes no sense here: %s", updvals[0]);
970 return -1;
971 }
972 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
973 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
974 } else if (strcmp(updvals[0], "N") == 0) {
975 gettimeofday(&tmp_time, 0);
976 normalize_time(&tmp_time);
977 *current_time = tmp_time.tv_sec;
978 *current_time_usec = tmp_time.tv_usec;
979 } else {
980 old_locale = setlocale(LC_NUMERIC, "C");
981 errno = 0;
982 tmp = strtod(updvals[0], 0);
983 if (errno > 0) {
984 rrd_set_error("converting '%s' to float: %s",
985 updvals[0], rrd_strerror(errno));
986 return -1;
987 };
988 setlocale(LC_NUMERIC, old_locale);
989 if (tmp < 0.0){
990 gettimeofday(&tmp_time, 0);
991 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
992 }
994 *current_time = floor(tmp);
995 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
996 }
997 /* dont do any correction for old version RRDs */
998 if (version < 3)
999 *current_time_usec = 0;
1001 if (*current_time < rrd->live_head->last_up ||
1002 (*current_time == rrd->live_head->last_up &&
1003 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1004 rrd_set_error("illegal attempt to update using time %ld when "
1005 "last update time is %ld (minimum one second step)",
1006 *current_time, rrd->live_head->last_up);
1007 return -1;
1008 }
1009 return 0;
1010 }
1012 /*
1013 * Update pdp_new by interpreting the updvals according to the DS type
1014 * (COUNTER, GAUGE, etc.).
1015 *
1016 * Returns 0 on success, -1 on error.
1017 */
1018 static int update_pdp_prep(
1019 rrd_t *rrd,
1020 char **updvals,
1021 rrd_value_t *pdp_new,
1022 double interval)
1023 {
1024 unsigned long ds_idx;
1025 int ii;
1026 char *endptr; /* used in the conversion */
1027 double rate;
1028 char *old_locale;
1029 enum dst_en dst_idx;
1031 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1032 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1034 /* make sure we do not build diffs with old last_ds values */
1035 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1036 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1037 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1038 }
1040 /* NOTE: DST_CDEF should never enter this if block, because
1041 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1042 * accidently specified a value for the DST_CDEF. To handle this case,
1043 * an extra check is required. */
1045 if ((updvals[ds_idx + 1][0] != 'U') &&
1046 (dst_idx != DST_CDEF) &&
1047 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1048 rate = DNAN;
1050 /* pdp_new contains rate * time ... eg the bytes transferred during
1051 * the interval. Doing it this way saves a lot of math operations
1052 */
1053 switch (dst_idx) {
1054 case DST_COUNTER:
1055 case DST_DERIVE:
1056 /* Check if this is a valid integer. `U' is already handled in
1057 * another branch. */
1058 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1059 if ((ii == 0) && (dst_idx == DST_DERIVE)
1060 && (updvals[ds_idx + 1][ii] == '-'))
1061 continue;
1063 if ((updvals[ds_idx + 1][ii] < '0')
1064 || (updvals[ds_idx + 1][ii] > '9')) {
1065 rrd_set_error("not a simple %s integer: '%s'",
1066 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1067 updvals[ds_idx + 1]);
1068 return -1;
1069 }
1070 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1072 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1073 pdp_new[ds_idx] =
1074 rrd_diff(updvals[ds_idx + 1],
1075 rrd->pdp_prep[ds_idx].last_ds);
1076 if (dst_idx == DST_COUNTER) {
1077 /* simple overflow catcher. This will fail
1078 * terribly for non 32 or 64 bit counters
1079 * ... are there any others in SNMP land?
1080 */
1081 if (pdp_new[ds_idx] < (double) 0.0)
1082 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1083 if (pdp_new[ds_idx] < (double) 0.0)
1084 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1085 }
1086 rate = pdp_new[ds_idx] / interval;
1087 } else {
1088 pdp_new[ds_idx] = DNAN;
1089 }
1090 break;
1091 case DST_ABSOLUTE:
1092 old_locale = setlocale(LC_NUMERIC, "C");
1093 errno = 0;
1094 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1095 if (errno > 0) {
1096 rrd_set_error("converting '%s' to float: %s",
1097 updvals[ds_idx + 1], rrd_strerror(errno));
1098 return -1;
1099 };
1100 setlocale(LC_NUMERIC, old_locale);
1101 if (endptr[0] != '\0') {
1102 rrd_set_error
1103 ("conversion of '%s' to float not complete: tail '%s'",
1104 updvals[ds_idx + 1], endptr);
1105 return -1;
1106 }
1107 rate = pdp_new[ds_idx] / interval;
1108 break;
1109 case DST_GAUGE:
1110 old_locale = setlocale(LC_NUMERIC, "C");
1111 errno = 0;
1112 pdp_new[ds_idx] =
1113 strtod(updvals[ds_idx + 1], &endptr) * interval;
1114 if (errno) {
1115 rrd_set_error("converting '%s' to float: %s",
1116 updvals[ds_idx + 1], rrd_strerror(errno));
1117 return -1;
1118 };
1119 setlocale(LC_NUMERIC, old_locale);
1120 if (endptr[0] != '\0') {
1121 rrd_set_error
1122 ("conversion of '%s' to float not complete: tail '%s'",
1123 updvals[ds_idx + 1], endptr);
1124 return -1;
1125 }
1126 rate = pdp_new[ds_idx] / interval;
1127 break;
1128 default:
1129 rrd_set_error("rrd contains unknown DS type : '%s'",
1130 rrd->ds_def[ds_idx].dst);
1131 return -1;
1132 }
1133 /* break out of this for loop if the error string is set */
1134 if (rrd_test_error()) {
1135 return -1;
1136 }
1137 /* make sure pdp_temp is neither too large or too small
1138 * if any of these occur it becomes unknown ...
1139 * sorry folks ... */
1140 if (!isnan(rate) &&
1141 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1142 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1143 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1144 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1145 pdp_new[ds_idx] = DNAN;
1146 }
1147 } else {
1148 /* no news is news all the same */
1149 pdp_new[ds_idx] = DNAN;
1150 }
1153 /* make a copy of the command line argument for the next run */
1154 #ifdef DEBUG
1155 fprintf(stderr, "prep ds[%lu]\t"
1156 "last_arg '%s'\t"
1157 "this_arg '%s'\t"
1158 "pdp_new %10.2f\n",
1159 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1160 pdp_new[ds_idx]);
1161 #endif
1162 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1163 LAST_DS_LEN - 1);
1164 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1165 }
1166 return 0;
1167 }
1169 /*
1170 * How many PDP steps have elapsed since the last update? Returns the answer,
1171 * and stores the time between the last update and the last PDP in pre_time,
1172 * and the time between the last PDP and the current time in post_int.
1173 */
1174 static int calculate_elapsed_steps(
1175 rrd_t *rrd,
1176 unsigned long current_time,
1177 unsigned long current_time_usec,
1178 double interval,
1179 double *pre_int,
1180 double *post_int,
1181 unsigned long *proc_pdp_cnt)
1182 {
1183 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1184 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1185 * time */
1186 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1187 * when it was last updated */
1188 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1190 /* when was the current pdp started */
1191 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1192 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1194 /* when did the last pdp_st occur */
1195 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1196 occu_pdp_st = current_time - occu_pdp_age;
1198 if (occu_pdp_st > proc_pdp_st) {
1199 /* OK we passed the pdp_st moment */
1200 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1201 * occurred before the latest
1202 * pdp_st moment*/
1203 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1204 *post_int = occu_pdp_age; /* how much after it */
1205 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1206 } else {
1207 *pre_int = interval;
1208 *post_int = 0;
1209 }
1211 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1213 #ifdef DEBUG
1214 printf("proc_pdp_age %lu\t"
1215 "proc_pdp_st %lu\t"
1216 "occu_pfp_age %lu\t"
1217 "occu_pdp_st %lu\t"
1218 "int %lf\t"
1219 "pre_int %lf\t"
1220 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1221 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1222 #endif
1224 /* compute the number of elapsed pdp_st moments */
1225 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1226 }
1228 /*
1229 * Increment the PDP values by the values in pdp_new, or else initialize them.
1230 */
1231 static void simple_update(
1232 rrd_t *rrd,
1233 double interval,
1234 rrd_value_t *pdp_new)
1235 {
1236 int i;
1238 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1239 if (isnan(pdp_new[i])) {
1240 /* this is not really accurate if we use subsecond data arrival time
1241 should have thought of it when going subsecond resolution ...
1242 sorry next format change we will have it! */
1243 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1244 floor(interval);
1245 } else {
1246 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1247 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1248 } else {
1249 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1250 }
1251 }
1252 #ifdef DEBUG
1253 fprintf(stderr,
1254 "NO PDP ds[%i]\t"
1255 "value %10.2f\t"
1256 "unkn_sec %5lu\n",
1257 i,
1258 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1259 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1260 #endif
1261 }
1262 }
1264 /*
1265 * Call process_pdp_st for each DS.
1266 *
1267 * Returns 0 on success, -1 on error.
1268 */
1269 static int process_all_pdp_st(
1270 rrd_t *rrd,
1271 double interval,
1272 double pre_int,
1273 double post_int,
1274 unsigned long elapsed_pdp_st,
1275 rrd_value_t *pdp_new,
1276 rrd_value_t *pdp_temp)
1277 {
1278 unsigned long ds_idx;
1280 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1281 rate*seconds which occurred up to the last run.
1282 pdp_new[] contains rate*seconds from the latest run.
1283 pdp_temp[] will contain the rate for cdp */
1285 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1286 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1287 elapsed_pdp_st * rrd->stat_head->pdp_step,
1288 pdp_new, pdp_temp) == -1) {
1289 return -1;
1290 }
1291 #ifdef DEBUG
1292 fprintf(stderr, "PDP UPD ds[%lu]\t"
1293 "elapsed_pdp_st %lu\t"
1294 "pdp_temp %10.2f\t"
1295 "new_prep %10.2f\t"
1296 "new_unkn_sec %5lu\n",
1297 ds_idx,
1298 elapsed_pdp_st,
1299 pdp_temp[ds_idx],
1300 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1301 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1302 #endif
1303 }
1304 return 0;
1305 }
1307 /*
1308 * Process an update that occurs after one of the PDP moments.
1309 * Increments the PDP value, sets NAN if time greater than the
1310 * heartbeats have elapsed, processes CDEFs.
1311 *
1312 * Returns 0 on success, -1 on error.
1313 */
1314 static int process_pdp_st(
1315 rrd_t *rrd,
1316 unsigned long ds_idx,
1317 double interval,
1318 double pre_int,
1319 double post_int,
1320 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1321 rrd_value_t *pdp_new,
1322 rrd_value_t *pdp_temp)
1323 {
1324 int i;
1326 /* update pdp_prep to the current pdp_st. */
1327 double pre_unknown = 0.0;
1328 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1329 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1331 rpnstack_t rpnstack; /* used for COMPUTE DS */
1333 rpnstack_init(&rpnstack);
1336 if (isnan(pdp_new[ds_idx])) {
1337 /* a final bit of unknown to be added before calculation
1338 we use a temporary variable for this so that we
1339 don't have to turn integer lines before using the value */
1340 pre_unknown = pre_int;
1341 } else {
1342 if (isnan(scratch[PDP_val].u_val)) {
1343 scratch[PDP_val].u_val = 0;
1344 }
1345 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1346 }
1348 /* if too much of the pdp_prep is unknown we dump it */
1349 /* if the interval is larger thatn mrhb we get NAN */
1350 if ((interval > mrhb) ||
1351 (rrd->stat_head->pdp_step / 2.0 <
1352 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1353 pdp_temp[ds_idx] = DNAN;
1354 } else {
1355 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1356 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1357 pre_unknown);
1358 }
1360 /* process CDEF data sources; remember each CDEF DS can
1361 * only reference other DS with a lower index number */
1362 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1363 rpnp_t *rpnp;
1365 rpnp =
1366 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1367 if(rpnp == NULL) {
1368 rpnstack_free(&rpnstack);
1369 return -1;
1370 }
1371 /* substitute data values for OP_VARIABLE nodes */
1372 for (i = 0; rpnp[i].op != OP_END; i++) {
1373 if (rpnp[i].op == OP_VARIABLE) {
1374 rpnp[i].op = OP_NUMBER;
1375 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1376 }
1377 }
1378 /* run the rpn calculator */
1379 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1380 free(rpnp);
1381 rpnstack_free(&rpnstack);
1382 return -1;
1383 }
1384 free(rpnp);
1385 }
1387 /* make pdp_prep ready for the next run */
1388 if (isnan(pdp_new[ds_idx])) {
1389 /* this is not realy accurate if we use subsecond data arival time
1390 should have thought of it when going subsecond resolution ...
1391 sorry next format change we will have it! */
1392 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1393 scratch[PDP_val].u_val = DNAN;
1394 } else {
1395 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1396 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1397 }
1398 rpnstack_free(&rpnstack);
1399 return 0;
1400 }
1402 /*
1403 * Iterate over all the RRAs for a given DS and:
1404 * 1. Decide whether to schedule a smooth later
1405 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1406 * 3. Update the CDP
1407 *
1408 * Returns 0 on success, -1 on error
1409 */
1410 static int update_all_cdp_prep(
1411 rrd_t *rrd,
1412 unsigned long *rra_step_cnt,
1413 unsigned long rra_begin,
1414 rrd_file_t *rrd_file,
1415 unsigned long elapsed_pdp_st,
1416 unsigned long proc_pdp_cnt,
1417 rrd_value_t **last_seasonal_coef,
1418 rrd_value_t **seasonal_coef,
1419 rrd_value_t *pdp_temp,
1420 unsigned long *skip_update,
1421 int *schedule_smooth)
1422 {
1423 unsigned long rra_idx;
1425 /* index into the CDP scratch array */
1426 enum cf_en current_cf;
1427 unsigned long rra_start;
1429 /* number of rows to be updated in an RRA for a data value. */
1430 unsigned long start_pdp_offset;
1432 rra_start = rra_begin;
1433 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1434 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1435 start_pdp_offset =
1436 rrd->rra_def[rra_idx].pdp_cnt -
1437 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1438 skip_update[rra_idx] = 0;
1439 if (start_pdp_offset <= elapsed_pdp_st) {
1440 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1441 rrd->rra_def[rra_idx].pdp_cnt + 1;
1442 } else {
1443 rra_step_cnt[rra_idx] = 0;
1444 }
1446 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1447 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1448 * so that they will be correct for the next observed value; note that for
1449 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1450 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1451 if (rra_step_cnt[rra_idx] > 1) {
1452 skip_update[rra_idx] = 1;
1453 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1454 elapsed_pdp_st, last_seasonal_coef);
1455 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1456 elapsed_pdp_st + 1, seasonal_coef);
1457 }
1458 /* periodically run a smoother for seasonal effects */
1459 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1460 #ifdef DEBUG
1461 fprintf(stderr,
1462 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1463 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1464 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1465 u_cnt);
1466 #endif
1467 *schedule_smooth = 1;
1468 }
1469 }
1470 if (rrd_test_error())
1471 return -1;
1473 if (update_cdp_prep
1474 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1475 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1476 current_cf) == -1) {
1477 return -1;
1478 }
1479 rra_start +=
1480 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1481 sizeof(rrd_value_t);
1482 }
1483 return 0;
1484 }
1486 /*
1487 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1488 */
1489 static int do_schedule_smooth(
1490 rrd_t *rrd,
1491 unsigned long rra_idx,
1492 unsigned long elapsed_pdp_st)
1493 {
1494 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1495 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1496 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1497 unsigned long seasonal_smooth_idx =
1498 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1499 unsigned long *init_seasonal =
1500 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1502 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1503 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1504 * really an RRA level, not a data source within RRA level parameter, but
1505 * the rra_def is read only for rrd_update (not flushed to disk). */
1506 if (*init_seasonal > BURNIN_CYCLES) {
1507 /* someone has no doubt invented a trick to deal with this wrap around,
1508 * but at least this code is clear. */
1509 if (seasonal_smooth_idx > cur_row) {
1510 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1511 * between PDP and CDP */
1512 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1513 }
1514 /* can't rely on negative numbers because we are working with
1515 * unsigned values */
1516 return (cur_row + elapsed_pdp_st >= row_cnt
1517 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1518 }
1519 /* mark off one of the burn-in cycles */
1520 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1521 }
1523 /*
1524 * For a given RRA, iterate over the data sources and call the appropriate
1525 * consolidation function.
1526 *
1527 * Returns 0 on success, -1 on error.
1528 */
1529 static int update_cdp_prep(
1530 rrd_t *rrd,
1531 unsigned long elapsed_pdp_st,
1532 unsigned long start_pdp_offset,
1533 unsigned long *rra_step_cnt,
1534 int rra_idx,
1535 rrd_value_t *pdp_temp,
1536 rrd_value_t *last_seasonal_coef,
1537 rrd_value_t *seasonal_coef,
1538 int current_cf)
1539 {
1540 unsigned long ds_idx, cdp_idx;
1542 /* update CDP_PREP areas */
1543 /* loop over data soures within each RRA */
1544 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1546 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1548 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1549 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1550 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1551 elapsed_pdp_st, start_pdp_offset,
1552 rrd->rra_def[rra_idx].pdp_cnt,
1553 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1554 rra_idx, ds_idx);
1555 } else {
1556 /* Nothing to consolidate if there's one PDP per CDP. However, if
1557 * we've missed some PDPs, let's update null counters etc. */
1558 if (elapsed_pdp_st > 2) {
1559 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1560 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1561 current_cf);
1562 }
1563 }
1565 if (rrd_test_error())
1566 return -1;
1567 } /* endif data sources loop */
1568 return 0;
1569 }
1571 /*
1572 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1573 * primary value, secondary value, and # of unknowns.
1574 */
1575 static void update_cdp(
1576 unival *scratch,
1577 int current_cf,
1578 rrd_value_t pdp_temp_val,
1579 unsigned long rra_step_cnt,
1580 unsigned long elapsed_pdp_st,
1581 unsigned long start_pdp_offset,
1582 unsigned long pdp_cnt,
1583 rrd_value_t xff,
1584 int i,
1585 int ii)
1586 {
1587 /* shorthand variables */
1588 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1589 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1590 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1591 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1593 if (rra_step_cnt) {
1594 /* If we are in this block, as least 1 CDP value will be written to
1595 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1596 * to be written, then the "fill in" value is the CDP_secondary_val
1597 * entry. */
1598 if (isnan(pdp_temp_val)) {
1599 *cdp_unkn_pdp_cnt += start_pdp_offset;
1600 *cdp_secondary_val = DNAN;
1601 } else {
1602 /* CDP_secondary value is the RRA "fill in" value for intermediary
1603 * CDP data entries. No matter the CF, the value is the same because
1604 * the average, max, min, and last of a list of identical values is
1605 * the same, namely, the value itself. */
1606 *cdp_secondary_val = pdp_temp_val;
1607 }
1609 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1610 *cdp_primary_val = DNAN;
1611 } else {
1612 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1613 start_pdp_offset, pdp_cnt);
1614 }
1615 *cdp_val =
1616 initialize_carry_over(pdp_temp_val,current_cf,
1617 elapsed_pdp_st,
1618 start_pdp_offset, pdp_cnt);
1619 /* endif meets xff value requirement for a valid value */
1620 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1621 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1622 if (isnan(pdp_temp_val))
1623 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1624 else
1625 *cdp_unkn_pdp_cnt = 0;
1626 } else { /* rra_step_cnt[i] == 0 */
1628 #ifdef DEBUG
1629 if (isnan(*cdp_val)) {
1630 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1631 i, ii);
1632 } else {
1633 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1634 i, ii, *cdp_val);
1635 }
1636 #endif
1637 if (isnan(pdp_temp_val)) {
1638 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1639 } else {
1640 *cdp_val =
1641 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1642 current_cf, i, ii);
1643 }
1644 }
1645 }
1647 /*
1648 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1649 * on the type of consolidation function.
1650 */
1651 static void initialize_cdp_val(
1652 unival *scratch,
1653 int current_cf,
1654 rrd_value_t pdp_temp_val,
1655 unsigned long start_pdp_offset,
1656 unsigned long pdp_cnt)
1657 {
1658 rrd_value_t cum_val, cur_val;
1660 switch (current_cf) {
1661 case CF_AVERAGE:
1662 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1663 cur_val = IFDNAN(pdp_temp_val, 0.0);
1664 scratch[CDP_primary_val].u_val =
1665 (cum_val + cur_val * start_pdp_offset) /
1666 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1667 break;
1668 case CF_MAXIMUM:
1669 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1670 cur_val = IFDNAN(pdp_temp_val, -DINF);
1672 #if 0
1673 #ifdef DEBUG
1674 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1675 fprintf(stderr,
1676 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1677 i, ii);
1678 exit(-1);
1679 }
1680 #endif
1681 #endif
1682 if (cur_val > cum_val)
1683 scratch[CDP_primary_val].u_val = cur_val;
1684 else
1685 scratch[CDP_primary_val].u_val = cum_val;
1686 break;
1687 case CF_MINIMUM:
1688 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1689 cur_val = IFDNAN(pdp_temp_val, DINF);
1690 #if 0
1691 #ifdef DEBUG
1692 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1693 fprintf(stderr,
1694 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1695 ii);
1696 exit(-1);
1697 }
1698 #endif
1699 #endif
1700 if (cur_val < cum_val)
1701 scratch[CDP_primary_val].u_val = cur_val;
1702 else
1703 scratch[CDP_primary_val].u_val = cum_val;
1704 break;
1705 case CF_LAST:
1706 default:
1707 scratch[CDP_primary_val].u_val = pdp_temp_val;
1708 break;
1709 }
1710 }
1712 /*
1713 * Update the consolidation function for Holt-Winters functions as
1714 * well as other functions that don't actually consolidate multiple
1715 * PDPs.
1716 */
1717 static void reset_cdp(
1718 rrd_t *rrd,
1719 unsigned long elapsed_pdp_st,
1720 rrd_value_t *pdp_temp,
1721 rrd_value_t *last_seasonal_coef,
1722 rrd_value_t *seasonal_coef,
1723 int rra_idx,
1724 int ds_idx,
1725 int cdp_idx,
1726 enum cf_en current_cf)
1727 {
1728 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1730 switch (current_cf) {
1731 case CF_AVERAGE:
1732 default:
1733 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1734 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1735 break;
1736 case CF_SEASONAL:
1737 case CF_DEVSEASONAL:
1738 /* need to update cached seasonal values, so they are consistent
1739 * with the bulk update */
1740 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1741 * CDP_last_deviation are the same. */
1742 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1743 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1744 break;
1745 case CF_HWPREDICT:
1746 case CF_MHWPREDICT:
1747 /* need to update the null_count and last_null_count.
1748 * even do this for non-DNAN pdp_temp because the
1749 * algorithm is not learning from batch updates. */
1750 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1751 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1752 /* fall through */
1753 case CF_DEVPREDICT:
1754 scratch[CDP_primary_val].u_val = DNAN;
1755 scratch[CDP_secondary_val].u_val = DNAN;
1756 break;
1757 case CF_FAILURES:
1758 /* do not count missed bulk values as failures */
1759 scratch[CDP_primary_val].u_val = 0;
1760 scratch[CDP_secondary_val].u_val = 0;
1761 /* need to reset violations buffer.
1762 * could do this more carefully, but for now, just
1763 * assume a bulk update wipes away all violations. */
1764 erase_violations(rrd, cdp_idx, rra_idx);
1765 break;
1766 }
1767 }
1769 static rrd_value_t initialize_carry_over(
1770 rrd_value_t pdp_temp_val,
1771 int current_cf,
1772 unsigned long elapsed_pdp_st,
1773 unsigned long start_pdp_offset,
1774 unsigned long pdp_cnt)
1775 {
1776 unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1777 if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1778 switch (current_cf) {
1779 case CF_MAXIMUM:
1780 return -DINF;
1781 case CF_MINIMUM:
1782 return DINF;
1783 case CF_AVERAGE:
1784 return 0;
1785 default:
1786 return DNAN;
1787 }
1788 }
1789 else {
1790 switch (current_cf) {
1791 case CF_AVERAGE:
1792 return pdp_temp_val * pdp_into_cdp_cnt ;
1793 default:
1794 return pdp_temp_val;
1795 }
1796 }
1797 }
1799 /*
1800 * Update or initialize a CDP value based on the consolidation
1801 * function.
1802 *
1803 * Returns the new value.
1804 */
1805 static rrd_value_t calculate_cdp_val(
1806 rrd_value_t cdp_val,
1807 rrd_value_t pdp_temp_val,
1808 unsigned long elapsed_pdp_st,
1809 int current_cf,
1810 #ifdef DEBUG
1811 int i,
1812 int ii
1813 #else
1814 int UNUSED(i),
1815 int UNUSED(ii)
1816 #endif
1817 )
1818 {
1819 if (isnan(cdp_val)) {
1820 if (current_cf == CF_AVERAGE) {
1821 pdp_temp_val *= elapsed_pdp_st;
1822 }
1823 #ifdef DEBUG
1824 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1825 i, ii, pdp_temp_val);
1826 #endif
1827 return pdp_temp_val;
1828 }
1829 if (current_cf == CF_AVERAGE)
1830 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1831 if (current_cf == CF_MINIMUM)
1832 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1833 if (current_cf == CF_MAXIMUM)
1834 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1836 return pdp_temp_val;
1837 }
1839 /*
1840 * For each RRA, update the seasonal values and then call update_aberrant_CF
1841 * for each data source.
1842 *
1843 * Return 0 on success, -1 on error.
1844 */
1845 static int update_aberrant_cdps(
1846 rrd_t *rrd,
1847 rrd_file_t *rrd_file,
1848 unsigned long rra_begin,
1849 unsigned long elapsed_pdp_st,
1850 rrd_value_t *pdp_temp,
1851 rrd_value_t **seasonal_coef)
1852 {
1853 unsigned long rra_idx, ds_idx, j;
1855 /* number of PDP steps since the last update that
1856 * are assigned to the first CDP to be generated
1857 * since the last update. */
1858 unsigned short scratch_idx;
1859 unsigned long rra_start;
1860 enum cf_en current_cf;
1862 /* this loop is only entered if elapsed_pdp_st < 3 */
1863 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1864 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1865 rra_start = rra_begin;
1866 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1867 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1868 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1869 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1870 if (scratch_idx == CDP_primary_val) {
1871 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1872 elapsed_pdp_st + 1, seasonal_coef);
1873 } else {
1874 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1875 elapsed_pdp_st + 2, seasonal_coef);
1876 }
1877 }
1878 if (rrd_test_error())
1879 return -1;
1880 /* loop over data soures within each RRA */
1881 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1882 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1883 rra_idx * (rrd->stat_head->ds_cnt) +
1884 ds_idx, rra_idx, ds_idx, scratch_idx,
1885 *seasonal_coef);
1886 }
1887 }
1888 rra_start += rrd->rra_def[rra_idx].row_cnt
1889 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1890 }
1891 }
1892 return 0;
1893 }
1895 /*
1896 * Move sequentially through the file, writing one RRA at a time. Note this
1897 * architecture divorces the computation of CDP with flushing updated RRA
1898 * entries to disk.
1899 *
1900 * Return 0 on success, -1 on error.
1901 */
1902 static int write_to_rras(
1903 rrd_t *rrd,
1904 rrd_file_t *rrd_file,
1905 unsigned long *rra_step_cnt,
1906 unsigned long rra_begin,
1907 time_t current_time,
1908 unsigned long *skip_update,
1909 rrd_info_t ** pcdp_summary)
1910 {
1911 unsigned long rra_idx;
1912 unsigned long rra_start;
1913 time_t rra_time = 0; /* time of update for a RRA */
1915 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1917 /* Ready to write to disk */
1918 rra_start = rra_begin;
1920 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1921 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1922 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1924 /* for cdp_prep */
1925 unsigned short scratch_idx;
1926 unsigned long step_subtract;
1928 for (scratch_idx = CDP_primary_val,
1929 step_subtract = 1;
1930 rra_step_cnt[rra_idx] > 0;
1931 rra_step_cnt[rra_idx]--,
1932 scratch_idx = CDP_secondary_val,
1933 step_subtract = 2) {
1935 size_t rra_pos_new;
1936 #ifdef DEBUG
1937 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1938 #endif
1939 /* increment, with wrap-around */
1940 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1941 rra_ptr->cur_row = 0;
1943 /* we know what our position should be */
1944 rra_pos_new = rra_start
1945 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1947 /* re-seek if the position is wrong or we wrapped around */
1948 if ((size_t)rra_pos_new != rrd_file->pos) {
1949 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1950 rrd_set_error("seek error in rrd");
1951 return -1;
1952 }
1953 }
1954 #ifdef DEBUG
1955 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1956 #endif
1958 if (skip_update[rra_idx])
1959 continue;
1961 if (*pcdp_summary != NULL) {
1962 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1964 rra_time = (current_time - current_time % step_time)
1965 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1966 }
1968 if (write_RRA_row
1969 (rrd_file, rrd, rra_idx, scratch_idx,
1970 pcdp_summary, rra_time) == -1)
1971 return -1;
1973 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1974 }
1976 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1977 } /* RRA LOOP */
1979 return 0;
1980 }
1982 /*
1983 * Write out one row of values (one value per DS) to the archive.
1984 *
1985 * Returns 0 on success, -1 on error.
1986 */
1987 static int write_RRA_row(
1988 rrd_file_t *rrd_file,
1989 rrd_t *rrd,
1990 unsigned long rra_idx,
1991 unsigned short CDP_scratch_idx,
1992 rrd_info_t ** pcdp_summary,
1993 time_t rra_time)
1994 {
1995 unsigned long ds_idx, cdp_idx;
1996 rrd_infoval_t iv;
1998 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1999 /* compute the cdp index */
2000 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2001 #ifdef DEBUG
2002 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2003 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2004 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2005 #endif
2006 if (*pcdp_summary != NULL) {
2007 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2008 /* append info to the return hash */
2009 *pcdp_summary = rrd_info_push(*pcdp_summary,
2010 sprintf_alloc
2011 ("[%lli]RRA[%s][%lu]DS[%s]",
2012 (long long)rra_time,
2013 rrd->rra_def[rra_idx].cf_nam,
2014 rrd->rra_def[rra_idx].pdp_cnt,
2015 rrd->ds_def[ds_idx].ds_nam),
2016 RD_I_VAL, iv);
2017 }
2018 errno = 0;
2019 if (rrd_write(rrd_file,
2020 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2021 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2022 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2023 return -1;
2024 }
2025 }
2026 return 0;
2027 }
2029 /*
2030 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2031 *
2032 * Returns 0 on success, -1 otherwise
2033 */
2034 static int smooth_all_rras(
2035 rrd_t *rrd,
2036 rrd_file_t *rrd_file,
2037 unsigned long rra_begin)
2038 {
2039 unsigned long rra_start = rra_begin;
2040 unsigned long rra_idx;
2042 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2043 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2044 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2045 #ifdef DEBUG
2046 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2047 #endif
2048 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2049 if (rrd_test_error())
2050 return -1;
2051 }
2052 rra_start += rrd->rra_def[rra_idx].row_cnt
2053 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2054 }
2055 return 0;
2056 }
2058 #ifndef HAVE_MMAP
2059 /*
2060 * Flush changes to disk (unless we're using mmap)
2061 *
2062 * Returns 0 on success, -1 otherwise
2063 */
2064 static int write_changes_to_disk(
2065 rrd_t *rrd,
2066 rrd_file_t *rrd_file,
2067 int version)
2068 {
2069 /* we just need to write back the live header portion now */
2070 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2071 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2072 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2073 SEEK_SET) != 0) {
2074 rrd_set_error("seek rrd for live header writeback");
2075 return -1;
2076 }
2077 if (version >= 3) {
2078 if (rrd_write(rrd_file, rrd->live_head,
2079 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2080 rrd_set_error("rrd_write live_head to rrd");
2081 return -1;
2082 }
2083 } else {
2084 if (rrd_write(rrd_file, rrd->legacy_last_up,
2085 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2086 rrd_set_error("rrd_write live_head to rrd");
2087 return -1;
2088 }
2089 }
2092 if (rrd_write(rrd_file, rrd->pdp_prep,
2093 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2094 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2095 rrd_set_error("rrd_write pdp_prep to rrd");
2096 return -1;
2097 }
2099 if (rrd_write(rrd_file, rrd->cdp_prep,
2100 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2101 rrd->stat_head->ds_cnt)
2102 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2103 rrd->stat_head->ds_cnt)) {
2105 rrd_set_error("rrd_write cdp_prep to rrd");
2106 return -1;
2107 }
2109 if (rrd_write(rrd_file, rrd->rra_ptr,
2110 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2111 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2112 rrd_set_error("rrd_write rra_ptr to rrd");
2113 return -1;
2114 }
2115 return 0;
2116 }
2117 #endif