dbf74df6629611472e5c90af4f530ce36583f338
1 /*****************************************************************************
2 * RRDtool 1.4.3 Copyright by Tobi Oetiker, 1997-2010
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int rrd_update_r(
67 const char *filename,
68 const char *tmplt,
69 int argc,
70 const char **argv);
71 int _rrd_update(
72 const char *filename,
73 const char *tmplt,
74 int argc,
75 const char **argv,
76 rrd_info_t *);
78 static int allocate_data_structures(
79 rrd_t *rrd,
80 char ***updvals,
81 rrd_value_t **pdp_temp,
82 const char *tmplt,
83 long **tmpl_idx,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
90 rrd_t *rrd,
91 const char *tmplt,
92 unsigned long *tmpl_cnt,
93 long *tmpl_idx);
95 static int process_arg(
96 char *step_start,
97 rrd_t *rrd,
98 rrd_file_t *rrd_file,
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
105 char **updvals,
106 long *tmpl_idx,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
109 int version,
110 unsigned long *skip_update,
111 int *schedule_smooth);
113 static int parse_ds(
114 rrd_t *rrd,
115 char **updvals,
116 long *tmpl_idx,
117 char *input,
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
121 int version);
123 static int get_time_from_reading(
124 rrd_t *rrd,
125 char timesyntax,
126 char **updvals,
127 time_t *current_time,
128 unsigned long *current_time_usec,
129 int version);
131 static int update_pdp_prep(
132 rrd_t *rrd,
133 char **updvals,
134 rrd_value_t *pdp_new,
135 double interval);
137 static int calculate_elapsed_steps(
138 rrd_t *rrd,
139 unsigned long current_time,
140 unsigned long current_time_usec,
141 double interval,
142 double *pre_int,
143 double *post_int,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
147 rrd_t *rrd,
148 double interval,
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152 rrd_t *rrd,
153 double interval,
154 double pre_int,
155 double post_int,
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161 rrd_t *rrd,
162 unsigned long ds_idx,
163 double interval,
164 double pre_int,
165 double post_int,
166 long diff_pdp_st,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171 rrd_t *rrd,
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
219 rrd_t *rrd,
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
224 int rra_idx,
225 int ds_idx,
226 int cdp_idx,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230 rrd_value_t pdp_temp_val,
231 int current_cf,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253 rrd_t *rrd,
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
257 time_t current_time,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
263 rrd_t *rrd,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
267 time_t rra_time);
269 static int smooth_all_rras(
270 rrd_t *rrd,
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276 rrd_t *rrd,
277 rrd_file_t *rrd_file,
278 int version);
279 #endif
281 /*
282 * normalize time as returned by gettimeofday. usec part must
283 * be always >= 0
284 */
285 static void normalize_time(
286 struct timeval *t)
287 {
288 if (t->tv_usec < 0) {
289 t->tv_sec--;
290 t->tv_usec += 1e6L;
291 }
292 }
294 /*
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
297 */
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
301 int version)
302 {
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
308 if (version >= 3) {
309 *current_time_usec = tmp_time.tv_usec;
310 } else {
311 *current_time_usec = 0;
312 }
313 }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318 int argc,
319 char **argv)
320 {
321 char *tmplt = NULL;
322 rrd_info_t *result = NULL;
323 rrd_infoval_t rc;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
327 {0, 0, 0, 0}
328 };
330 rc.u_int = -1;
331 optind = 0;
332 opterr = 0; /* initialize getopt */
334 while (1) {
335 int option_index = 0;
336 int opt;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340 if (opt == EOF)
341 break;
343 switch (opt) {
344 case 't':
345 tmplt = optarg;
346 break;
348 case '?':
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
350 goto end_tag;
351 }
352 }
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
360 goto end_tag;
361 }
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
366 goto end_tag;
367 }
368 rc.u_int = 0;
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
371 argc - optind - 1,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
374 end_tag:
375 return result;
376 }
378 int rrd_update(
379 int argc,
380 char **argv)
381 {
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
385 {0, 0, 0, 0}
386 };
387 int option_index = 0;
388 int opt;
389 char *tmplt = NULL;
390 int rc = -1;
391 char *opt_daemon = NULL;
393 optind = 0;
394 opterr = 0; /* initialize getopt */
396 while (1) {
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399 if (opt == EOF)
400 break;
402 switch (opt) {
403 case 't':
404 tmplt = strdup(optarg);
405 break;
407 case 'd':
408 if (opt_daemon != NULL)
409 free (opt_daemon);
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
412 {
413 rrd_set_error("strdup failed.");
414 goto out;
415 }
416 break;
418 case '?':
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 goto out;
421 }
422 }
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
427 goto out;
428 }
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) {
433 rc = status;
434 goto out;
435 }
436 }
438 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
439 {
440 rrd_set_error("The caching daemon cannot be used together with "
441 "templates yet.");
442 goto out;
443 }
445 if (! rrdc_is_connected(opt_daemon))
446 {
447 rc = rrd_update_r(argv[optind], tmplt,
448 argc - optind - 1, (const char **) (argv + optind + 1));
449 }
450 else /* we are connected */
451 {
452 rc = rrdc_update (argv[optind], /* file */
453 argc - optind - 1, /* values_num */
454 (const char *const *) (argv + optind + 1)); /* values */
455 if (rc > 0)
456 rrd_set_error("Failed sending the values to rrdcached: %s",
457 rrd_strerror (rc));
458 }
460 out:
461 if (tmplt != NULL)
462 {
463 free(tmplt);
464 tmplt = NULL;
465 }
466 if (opt_daemon != NULL)
467 {
468 free (opt_daemon);
469 opt_daemon = NULL;
470 }
471 return rc;
472 }
474 int rrd_update_r(
475 const char *filename,
476 const char *tmplt,
477 int argc,
478 const char **argv)
479 {
480 return _rrd_update(filename, tmplt, argc, argv, NULL);
481 }
483 int rrd_update_v_r(
484 const char *filename,
485 const char *tmplt,
486 int argc,
487 const char **argv,
488 rrd_info_t * pcdp_summary)
489 {
490 return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
491 }
493 int _rrd_update(
494 const char *filename,
495 const char *tmplt,
496 int argc,
497 const char **argv,
498 rrd_info_t * pcdp_summary)
499 {
501 int arg_i = 2;
503 unsigned long rra_begin; /* byte pointer to the rra
504 * area in the rrd file. this
505 * pointer never changes value */
506 rrd_value_t *pdp_new; /* prepare the incoming data to be added
507 * to the existing entry */
508 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
509 * to the cdp values */
511 long *tmpl_idx; /* index representing the settings
512 * transported by the tmplt index */
513 unsigned long tmpl_cnt = 2; /* time and data */
514 rrd_t rrd;
515 time_t current_time = 0;
516 unsigned long current_time_usec = 0; /* microseconds part of current time */
517 char **updvals;
518 int schedule_smooth = 0;
520 /* number of elapsed PDP steps since last update */
521 unsigned long *rra_step_cnt = NULL;
523 int version; /* rrd version */
524 rrd_file_t *rrd_file;
525 char *arg_copy; /* for processing the argv */
526 unsigned long *skip_update; /* RRAs to advance but not write */
528 /* need at least 1 arguments: data. */
529 if (argc < 1) {
530 rrd_set_error("Not enough arguments");
531 goto err_out;
532 }
534 rrd_init(&rrd);
535 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
536 goto err_free;
537 }
538 /* We are now at the beginning of the rra's */
539 rra_begin = rrd_file->header_len;
541 version = atoi(rrd.stat_head->version);
543 initialize_time(¤t_time, ¤t_time_usec, version);
545 /* get exclusive lock to whole file.
546 * lock gets removed when we close the file.
547 */
548 if (rrd_lock(rrd_file) != 0) {
549 rrd_set_error("could not lock RRD");
550 goto err_close;
551 }
553 if (allocate_data_structures(&rrd, &updvals,
554 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
555 &rra_step_cnt, &skip_update,
556 &pdp_new) == -1) {
557 goto err_close;
558 }
560 /* loop through the arguments. */
561 for (arg_i = 0; arg_i < argc; arg_i++) {
562 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
563 rrd_set_error("failed duplication argv entry");
564 break;
565 }
566 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
567 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
568 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
569 &pcdp_summary, version, skip_update,
570 &schedule_smooth) == -1) {
571 if (rrd_test_error()) { /* Should have error string always here */
572 char *save_error;
574 /* Prepend file name to error message */
575 if ((save_error = strdup(rrd_get_error())) != NULL) {
576 rrd_set_error("%s: %s", filename, save_error);
577 free(save_error);
578 }
579 }
580 free(arg_copy);
581 break;
582 }
583 free(arg_copy);
584 }
586 free(rra_step_cnt);
588 /* if we got here and if there is an error and if the file has not been
589 * written to, then close things up and return. */
590 if (rrd_test_error()) {
591 goto err_free_structures;
592 }
593 #ifndef HAVE_MMAP
594 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
595 goto err_free_structures;
596 }
597 #endif
599 /* calling the smoothing code here guarantees at most one smoothing
600 * operation per rrd_update call. Unfortunately, it is possible with bulk
601 * updates, or a long-delayed update for smoothing to occur off-schedule.
602 * This really isn't critical except during the burn-in cycles. */
603 if (schedule_smooth) {
604 smooth_all_rras(&rrd, rrd_file, rra_begin);
605 }
607 /* rrd_dontneed(rrd_file,&rrd); */
608 rrd_free(&rrd);
609 rrd_close(rrd_file);
611 free(pdp_new);
612 free(tmpl_idx);
613 free(pdp_temp);
614 free(skip_update);
615 free(updvals);
616 return 0;
618 err_free_structures:
619 free(pdp_new);
620 free(tmpl_idx);
621 free(pdp_temp);
622 free(skip_update);
623 free(updvals);
624 err_close:
625 rrd_close(rrd_file);
626 err_free:
627 rrd_free(&rrd);
628 err_out:
629 return -1;
630 }
632 /*
633 * Allocate some important arrays used, and initialize the template.
634 *
635 * When it returns, either all of the structures are allocated
636 * or none of them are.
637 *
638 * Returns 0 on success, -1 on error.
639 */
640 static int allocate_data_structures(
641 rrd_t *rrd,
642 char ***updvals,
643 rrd_value_t **pdp_temp,
644 const char *tmplt,
645 long **tmpl_idx,
646 unsigned long *tmpl_cnt,
647 unsigned long **rra_step_cnt,
648 unsigned long **skip_update,
649 rrd_value_t **pdp_new)
650 {
651 unsigned i, ii;
652 if ((*updvals = (char **) malloc(sizeof(char *)
653 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
654 rrd_set_error("allocating updvals pointer array.");
655 return -1;
656 }
657 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
658 * rrd->stat_head->ds_cnt)) ==
659 NULL) {
660 rrd_set_error("allocating pdp_temp.");
661 goto err_free_updvals;
662 }
663 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
664 *
665 rrd->stat_head->rra_cnt)) ==
666 NULL) {
667 rrd_set_error("allocating skip_update.");
668 goto err_free_pdp_temp;
669 }
670 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
671 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
672 rrd_set_error("allocating tmpl_idx.");
673 goto err_free_skip_update;
674 }
675 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
676 *
677 (rrd->stat_head->
678 rra_cnt))) == NULL) {
679 rrd_set_error("allocating rra_step_cnt.");
680 goto err_free_tmpl_idx;
681 }
683 /* initialize tmplt redirector */
684 /* default config example (assume DS 1 is a CDEF DS)
685 tmpl_idx[0] -> 0; (time)
686 tmpl_idx[1] -> 1; (DS 0)
687 tmpl_idx[2] -> 3; (DS 2)
688 tmpl_idx[3] -> 4; (DS 3) */
689 (*tmpl_idx)[0] = 0; /* time */
690 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
691 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
692 (*tmpl_idx)[ii++] = i;
693 }
694 *tmpl_cnt = ii;
696 if (tmplt != NULL) {
697 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
698 goto err_free_rra_step_cnt;
699 }
700 }
702 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
703 * rrd->stat_head->ds_cnt)) == NULL) {
704 rrd_set_error("allocating pdp_new.");
705 goto err_free_rra_step_cnt;
706 }
708 return 0;
710 err_free_rra_step_cnt:
711 free(*rra_step_cnt);
712 err_free_tmpl_idx:
713 free(*tmpl_idx);
714 err_free_skip_update:
715 free(*skip_update);
716 err_free_pdp_temp:
717 free(*pdp_temp);
718 err_free_updvals:
719 free(*updvals);
720 return -1;
721 }
723 /*
724 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
725 *
726 * Returns 0 on success.
727 */
728 static int parse_template(
729 rrd_t *rrd,
730 const char *tmplt,
731 unsigned long *tmpl_cnt,
732 long *tmpl_idx)
733 {
734 char *dsname, *tmplt_copy;
735 unsigned int tmpl_len, i;
736 int ret = 0;
738 *tmpl_cnt = 1; /* the first entry is the time */
740 /* we should work on a writeable copy here */
741 if ((tmplt_copy = strdup(tmplt)) == NULL) {
742 rrd_set_error("error copying tmplt '%s'", tmplt);
743 ret = -1;
744 goto out;
745 }
747 dsname = tmplt_copy;
748 tmpl_len = strlen(tmplt_copy);
749 for (i = 0; i <= tmpl_len; i++) {
750 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
751 tmplt_copy[i] = '\0';
752 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
753 rrd_set_error("tmplt contains more DS definitions than RRD");
754 ret = -1;
755 goto out_free_tmpl_copy;
756 }
757 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
758 rrd_set_error("unknown DS name '%s'", dsname);
759 ret = -1;
760 goto out_free_tmpl_copy;
761 }
762 /* go to the next entry on the tmplt_copy */
763 if (i < tmpl_len)
764 dsname = &tmplt_copy[i + 1];
765 }
766 }
767 out_free_tmpl_copy:
768 free(tmplt_copy);
769 out:
770 return ret;
771 }
773 /*
774 * Parse an update string, updates the primary data points (PDPs)
775 * and consolidated data points (CDPs), and writes changes to the RRAs.
776 *
777 * Returns 0 on success, -1 on error.
778 */
779 static int process_arg(
780 char *step_start,
781 rrd_t *rrd,
782 rrd_file_t *rrd_file,
783 unsigned long rra_begin,
784 time_t *current_time,
785 unsigned long *current_time_usec,
786 rrd_value_t *pdp_temp,
787 rrd_value_t *pdp_new,
788 unsigned long *rra_step_cnt,
789 char **updvals,
790 long *tmpl_idx,
791 unsigned long tmpl_cnt,
792 rrd_info_t ** pcdp_summary,
793 int version,
794 unsigned long *skip_update,
795 int *schedule_smooth)
796 {
797 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
799 /* a vector of future Holt-Winters seasonal coefs */
800 unsigned long elapsed_pdp_st;
802 double interval, pre_int, post_int; /* interval between this and
803 * the last run */
804 unsigned long proc_pdp_cnt;
806 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
807 current_time, current_time_usec, version) == -1) {
808 return -1;
809 }
811 interval = (double) (*current_time - rrd->live_head->last_up)
812 + (double) ((long) *current_time_usec -
813 (long) rrd->live_head->last_up_usec) / 1e6f;
815 /* process the data sources and update the pdp_prep
816 * area accordingly */
817 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
818 return -1;
819 }
821 elapsed_pdp_st = calculate_elapsed_steps(rrd,
822 *current_time,
823 *current_time_usec, interval,
824 &pre_int, &post_int,
825 &proc_pdp_cnt);
827 /* has a pdp_st moment occurred since the last run ? */
828 if (elapsed_pdp_st == 0) {
829 /* no we have not passed a pdp_st moment. therefore update is simple */
830 simple_update(rrd, interval, pdp_new);
831 } else {
832 /* an pdp_st has occurred. */
833 if (process_all_pdp_st(rrd, interval,
834 pre_int, post_int,
835 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
836 return -1;
837 }
838 if (update_all_cdp_prep(rrd, rra_step_cnt,
839 rra_begin, rrd_file,
840 elapsed_pdp_st,
841 proc_pdp_cnt,
842 &last_seasonal_coef,
843 &seasonal_coef,
844 pdp_temp,
845 skip_update, schedule_smooth) == -1) {
846 goto err_free_coefficients;
847 }
848 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
849 elapsed_pdp_st, pdp_temp,
850 &seasonal_coef) == -1) {
851 goto err_free_coefficients;
852 }
853 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
854 *current_time, skip_update,
855 pcdp_summary) == -1) {
856 goto err_free_coefficients;
857 }
858 } /* endif a pdp_st has occurred */
859 rrd->live_head->last_up = *current_time;
860 rrd->live_head->last_up_usec = *current_time_usec;
862 if (version < 3) {
863 *rrd->legacy_last_up = rrd->live_head->last_up;
864 }
865 free(seasonal_coef);
866 free(last_seasonal_coef);
867 return 0;
869 err_free_coefficients:
870 free(seasonal_coef);
871 free(last_seasonal_coef);
872 return -1;
873 }
875 /*
876 * Parse a DS string (time + colon-separated values), storing the
877 * results in current_time, current_time_usec, and updvals.
878 *
879 * Returns 0 on success, -1 on error.
880 */
881 static int parse_ds(
882 rrd_t *rrd,
883 char **updvals,
884 long *tmpl_idx,
885 char *input,
886 unsigned long tmpl_cnt,
887 time_t *current_time,
888 unsigned long *current_time_usec,
889 int version)
890 {
891 char *p;
892 unsigned long i;
893 char timesyntax;
895 updvals[0] = input;
896 /* initialize all ds input to unknown except the first one
897 which has always got to be set */
898 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
899 updvals[i] = "U";
901 /* separate all ds elements; first must be examined separately
902 due to alternate time syntax */
903 if ((p = strchr(input, '@')) != NULL) {
904 timesyntax = '@';
905 } else if ((p = strchr(input, ':')) != NULL) {
906 timesyntax = ':';
907 } else {
908 rrd_set_error("expected timestamp not found in data source from %s",
909 input);
910 return -1;
911 }
912 *p = '\0';
913 i = 1;
914 updvals[tmpl_idx[i++]] = p + 1;
915 while (*(++p)) {
916 if (*p == ':') {
917 *p = '\0';
918 if (i < tmpl_cnt) {
919 updvals[tmpl_idx[i++]] = p + 1;
920 }
921 else {
922 rrd_set_error("found extra data on update argument: %s",p+1);
923 return -1;
924 }
925 }
926 }
928 if (i != tmpl_cnt) {
929 rrd_set_error("expected %lu data source readings (got %lu) from %s",
930 tmpl_cnt - 1, i - 1, input);
931 return -1;
932 }
934 if (get_time_from_reading(rrd, timesyntax, updvals,
935 current_time, current_time_usec,
936 version) == -1) {
937 return -1;
938 }
939 return 0;
940 }
942 /*
943 * Parse the time in a DS string, store it in current_time and
944 * current_time_usec and verify that it's later than the last
945 * update for this DS.
946 *
947 * Returns 0 on success, -1 on error.
948 */
949 static int get_time_from_reading(
950 rrd_t *rrd,
951 char timesyntax,
952 char **updvals,
953 time_t *current_time,
954 unsigned long *current_time_usec,
955 int version)
956 {
957 double tmp;
958 char *parsetime_error = NULL;
959 char *old_locale;
960 rrd_time_value_t ds_tv;
961 struct timeval tmp_time; /* used for time conversion */
963 /* get the time from the reading ... handle N */
964 if (timesyntax == '@') { /* at-style */
965 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
966 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
967 return -1;
968 }
969 if (ds_tv.type == RELATIVE_TO_END_TIME ||
970 ds_tv.type == RELATIVE_TO_START_TIME) {
971 rrd_set_error("specifying time relative to the 'start' "
972 "or 'end' makes no sense here: %s", updvals[0]);
973 return -1;
974 }
975 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
976 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
977 } else if (strcmp(updvals[0], "N") == 0) {
978 gettimeofday(&tmp_time, 0);
979 normalize_time(&tmp_time);
980 *current_time = tmp_time.tv_sec;
981 *current_time_usec = tmp_time.tv_usec;
982 } else {
983 old_locale = setlocale(LC_NUMERIC, NULL);
984 setlocale(LC_NUMERIC, "C");
985 errno = 0;
986 tmp = strtod(updvals[0], 0);
987 if (errno > 0) {
988 rrd_set_error("converting '%s' to float: %s",
989 updvals[0], rrd_strerror(errno));
990 return -1;
991 };
992 setlocale(LC_NUMERIC, old_locale);
993 if (tmp < 0.0){
994 gettimeofday(&tmp_time, 0);
995 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
996 }
998 *current_time = floor(tmp);
999 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1000 }
1001 /* dont do any correction for old version RRDs */
1002 if (version < 3)
1003 *current_time_usec = 0;
1005 if (*current_time < rrd->live_head->last_up ||
1006 (*current_time == rrd->live_head->last_up &&
1007 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1008 rrd_set_error("illegal attempt to update using time %ld when "
1009 "last update time is %ld (minimum one second step)",
1010 *current_time, rrd->live_head->last_up);
1011 return -1;
1012 }
1013 return 0;
1014 }
1016 /*
1017 * Update pdp_new by interpreting the updvals according to the DS type
1018 * (COUNTER, GAUGE, etc.).
1019 *
1020 * Returns 0 on success, -1 on error.
1021 */
1022 static int update_pdp_prep(
1023 rrd_t *rrd,
1024 char **updvals,
1025 rrd_value_t *pdp_new,
1026 double interval)
1027 {
1028 unsigned long ds_idx;
1029 int ii;
1030 char *endptr; /* used in the conversion */
1031 double rate;
1032 char *old_locale;
1033 enum dst_en dst_idx;
1035 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1036 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1038 /* make sure we do not build diffs with old last_ds values */
1039 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1040 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1041 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1042 }
1044 /* NOTE: DST_CDEF should never enter this if block, because
1045 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1046 * accidently specified a value for the DST_CDEF. To handle this case,
1047 * an extra check is required. */
1049 if ((updvals[ds_idx + 1][0] != 'U') &&
1050 (dst_idx != DST_CDEF) &&
1051 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1052 rate = DNAN;
1054 /* pdp_new contains rate * time ... eg the bytes transferred during
1055 * the interval. Doing it this way saves a lot of math operations
1056 */
1057 switch (dst_idx) {
1058 case DST_COUNTER:
1059 case DST_DERIVE:
1060 /* Check if this is a valid integer. `U' is already handled in
1061 * another branch. */
1062 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1063 if ((ii == 0) && (dst_idx == DST_DERIVE)
1064 && (updvals[ds_idx + 1][ii] == '-'))
1065 continue;
1067 if ((updvals[ds_idx + 1][ii] < '0')
1068 || (updvals[ds_idx + 1][ii] > '9')) {
1069 rrd_set_error("not a simple %s integer: '%s'",
1070 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1071 updvals[ds_idx + 1]);
1072 return -1;
1073 }
1074 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1076 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1077 pdp_new[ds_idx] =
1078 rrd_diff(updvals[ds_idx + 1],
1079 rrd->pdp_prep[ds_idx].last_ds);
1080 if (dst_idx == DST_COUNTER) {
1081 /* simple overflow catcher. This will fail
1082 * terribly for non 32 or 64 bit counters
1083 * ... are there any others in SNMP land?
1084 */
1085 if (pdp_new[ds_idx] < (double) 0.0)
1086 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1087 if (pdp_new[ds_idx] < (double) 0.0)
1088 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1089 }
1090 rate = pdp_new[ds_idx] / interval;
1091 } else {
1092 pdp_new[ds_idx] = DNAN;
1093 }
1094 break;
1095 case DST_ABSOLUTE:
1096 old_locale = setlocale(LC_NUMERIC, NULL);
1097 setlocale(LC_NUMERIC, "C");
1098 errno = 0;
1099 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1100 if (errno > 0) {
1101 rrd_set_error("converting '%s' to float: %s",
1102 updvals[ds_idx + 1], rrd_strerror(errno));
1103 return -1;
1104 };
1105 setlocale(LC_NUMERIC, old_locale);
1106 if (endptr[0] != '\0') {
1107 rrd_set_error
1108 ("conversion of '%s' to float not complete: tail '%s'",
1109 updvals[ds_idx + 1], endptr);
1110 return -1;
1111 }
1112 rate = pdp_new[ds_idx] / interval;
1113 break;
1114 case DST_GAUGE:
1115 old_locale = setlocale(LC_NUMERIC, NULL);
1116 setlocale(LC_NUMERIC, "C");
1117 errno = 0;
1118 pdp_new[ds_idx] =
1119 strtod(updvals[ds_idx + 1], &endptr) * interval;
1120 if (errno) {
1121 rrd_set_error("converting '%s' to float: %s",
1122 updvals[ds_idx + 1], rrd_strerror(errno));
1123 return -1;
1124 };
1125 setlocale(LC_NUMERIC, old_locale);
1126 if (endptr[0] != '\0') {
1127 rrd_set_error
1128 ("conversion of '%s' to float not complete: tail '%s'",
1129 updvals[ds_idx + 1], endptr);
1130 return -1;
1131 }
1132 rate = pdp_new[ds_idx] / interval;
1133 break;
1134 default:
1135 rrd_set_error("rrd contains unknown DS type : '%s'",
1136 rrd->ds_def[ds_idx].dst);
1137 return -1;
1138 }
1139 /* break out of this for loop if the error string is set */
1140 if (rrd_test_error()) {
1141 return -1;
1142 }
1143 /* make sure pdp_temp is neither too large or too small
1144 * if any of these occur it becomes unknown ...
1145 * sorry folks ... */
1146 if (!isnan(rate) &&
1147 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1148 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1149 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1150 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1151 pdp_new[ds_idx] = DNAN;
1152 }
1153 } else {
1154 /* no news is news all the same */
1155 pdp_new[ds_idx] = DNAN;
1156 }
1159 /* make a copy of the command line argument for the next run */
1160 #ifdef DEBUG
1161 fprintf(stderr, "prep ds[%lu]\t"
1162 "last_arg '%s'\t"
1163 "this_arg '%s'\t"
1164 "pdp_new %10.2f\n",
1165 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1166 pdp_new[ds_idx]);
1167 #endif
1168 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1169 LAST_DS_LEN - 1);
1170 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1171 }
1172 return 0;
1173 }
1175 /*
1176 * How many PDP steps have elapsed since the last update? Returns the answer,
1177 * and stores the time between the last update and the last PDP in pre_time,
1178 * and the time between the last PDP and the current time in post_int.
1179 */
1180 static int calculate_elapsed_steps(
1181 rrd_t *rrd,
1182 unsigned long current_time,
1183 unsigned long current_time_usec,
1184 double interval,
1185 double *pre_int,
1186 double *post_int,
1187 unsigned long *proc_pdp_cnt)
1188 {
1189 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1190 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1191 * time */
1192 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1193 * when it was last updated */
1194 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1196 /* when was the current pdp started */
1197 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1198 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1200 /* when did the last pdp_st occur */
1201 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1202 occu_pdp_st = current_time - occu_pdp_age;
1204 if (occu_pdp_st > proc_pdp_st) {
1205 /* OK we passed the pdp_st moment */
1206 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1207 * occurred before the latest
1208 * pdp_st moment*/
1209 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1210 *post_int = occu_pdp_age; /* how much after it */
1211 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1212 } else {
1213 *pre_int = interval;
1214 *post_int = 0;
1215 }
1217 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1219 #ifdef DEBUG
1220 printf("proc_pdp_age %lu\t"
1221 "proc_pdp_st %lu\t"
1222 "occu_pfp_age %lu\t"
1223 "occu_pdp_st %lu\t"
1224 "int %lf\t"
1225 "pre_int %lf\t"
1226 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1227 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1228 #endif
1230 /* compute the number of elapsed pdp_st moments */
1231 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1232 }
1234 /*
1235 * Increment the PDP values by the values in pdp_new, or else initialize them.
1236 */
1237 static void simple_update(
1238 rrd_t *rrd,
1239 double interval,
1240 rrd_value_t *pdp_new)
1241 {
1242 int i;
1244 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1245 if (isnan(pdp_new[i])) {
1246 /* this is not really accurate if we use subsecond data arrival time
1247 should have thought of it when going subsecond resolution ...
1248 sorry next format change we will have it! */
1249 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1250 floor(interval);
1251 } else {
1252 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1253 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1254 } else {
1255 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1256 }
1257 }
1258 #ifdef DEBUG
1259 fprintf(stderr,
1260 "NO PDP ds[%i]\t"
1261 "value %10.2f\t"
1262 "unkn_sec %5lu\n",
1263 i,
1264 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1265 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1266 #endif
1267 }
1268 }
1270 /*
1271 * Call process_pdp_st for each DS.
1272 *
1273 * Returns 0 on success, -1 on error.
1274 */
1275 static int process_all_pdp_st(
1276 rrd_t *rrd,
1277 double interval,
1278 double pre_int,
1279 double post_int,
1280 unsigned long elapsed_pdp_st,
1281 rrd_value_t *pdp_new,
1282 rrd_value_t *pdp_temp)
1283 {
1284 unsigned long ds_idx;
1286 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1287 rate*seconds which occurred up to the last run.
1288 pdp_new[] contains rate*seconds from the latest run.
1289 pdp_temp[] will contain the rate for cdp */
1291 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1292 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1293 elapsed_pdp_st * rrd->stat_head->pdp_step,
1294 pdp_new, pdp_temp) == -1) {
1295 return -1;
1296 }
1297 #ifdef DEBUG
1298 fprintf(stderr, "PDP UPD ds[%lu]\t"
1299 "elapsed_pdp_st %lu\t"
1300 "pdp_temp %10.2f\t"
1301 "new_prep %10.2f\t"
1302 "new_unkn_sec %5lu\n",
1303 ds_idx,
1304 elapsed_pdp_st,
1305 pdp_temp[ds_idx],
1306 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1307 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1308 #endif
1309 }
1310 return 0;
1311 }
1313 /*
1314 * Process an update that occurs after one of the PDP moments.
1315 * Increments the PDP value, sets NAN if time greater than the
1316 * heartbeats have elapsed, processes CDEFs.
1317 *
1318 * Returns 0 on success, -1 on error.
1319 */
1320 static int process_pdp_st(
1321 rrd_t *rrd,
1322 unsigned long ds_idx,
1323 double interval,
1324 double pre_int,
1325 double post_int,
1326 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1327 rrd_value_t *pdp_new,
1328 rrd_value_t *pdp_temp)
1329 {
1330 int i;
1332 /* update pdp_prep to the current pdp_st. */
1333 double pre_unknown = 0.0;
1334 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1335 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1337 rpnstack_t rpnstack; /* used for COMPUTE DS */
1339 rpnstack_init(&rpnstack);
1342 if (isnan(pdp_new[ds_idx])) {
1343 /* a final bit of unknown to be added before calculation
1344 we use a temporary variable for this so that we
1345 don't have to turn integer lines before using the value */
1346 pre_unknown = pre_int;
1347 } else {
1348 if (isnan(scratch[PDP_val].u_val)) {
1349 scratch[PDP_val].u_val = 0;
1350 }
1351 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1352 }
1354 /* if too much of the pdp_prep is unknown we dump it */
1355 /* if the interval is larger thatn mrhb we get NAN */
1356 if ((interval > mrhb) ||
1357 (rrd->stat_head->pdp_step / 2.0 <
1358 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1359 pdp_temp[ds_idx] = DNAN;
1360 } else {
1361 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1362 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1363 pre_unknown);
1364 }
1366 /* process CDEF data sources; remember each CDEF DS can
1367 * only reference other DS with a lower index number */
1368 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1369 rpnp_t *rpnp;
1371 rpnp =
1372 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1373 if(rpnp == NULL) {
1374 rpnstack_free(&rpnstack);
1375 return -1;
1376 }
1377 /* substitute data values for OP_VARIABLE nodes */
1378 for (i = 0; rpnp[i].op != OP_END; i++) {
1379 if (rpnp[i].op == OP_VARIABLE) {
1380 rpnp[i].op = OP_NUMBER;
1381 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1382 }
1383 }
1384 /* run the rpn calculator */
1385 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1386 free(rpnp);
1387 rpnstack_free(&rpnstack);
1388 return -1;
1389 }
1390 free(rpnp);
1391 }
1393 /* make pdp_prep ready for the next run */
1394 if (isnan(pdp_new[ds_idx])) {
1395 /* this is not realy accurate if we use subsecond data arival time
1396 should have thought of it when going subsecond resolution ...
1397 sorry next format change we will have it! */
1398 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1399 scratch[PDP_val].u_val = DNAN;
1400 } else {
1401 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1402 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1403 }
1404 rpnstack_free(&rpnstack);
1405 return 0;
1406 }
1408 /*
1409 * Iterate over all the RRAs for a given DS and:
1410 * 1. Decide whether to schedule a smooth later
1411 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1412 * 3. Update the CDP
1413 *
1414 * Returns 0 on success, -1 on error
1415 */
1416 static int update_all_cdp_prep(
1417 rrd_t *rrd,
1418 unsigned long *rra_step_cnt,
1419 unsigned long rra_begin,
1420 rrd_file_t *rrd_file,
1421 unsigned long elapsed_pdp_st,
1422 unsigned long proc_pdp_cnt,
1423 rrd_value_t **last_seasonal_coef,
1424 rrd_value_t **seasonal_coef,
1425 rrd_value_t *pdp_temp,
1426 unsigned long *skip_update,
1427 int *schedule_smooth)
1428 {
1429 unsigned long rra_idx;
1431 /* index into the CDP scratch array */
1432 enum cf_en current_cf;
1433 unsigned long rra_start;
1435 /* number of rows to be updated in an RRA for a data value. */
1436 unsigned long start_pdp_offset;
1438 rra_start = rra_begin;
1439 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1440 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1441 start_pdp_offset =
1442 rrd->rra_def[rra_idx].pdp_cnt -
1443 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1444 skip_update[rra_idx] = 0;
1445 if (start_pdp_offset <= elapsed_pdp_st) {
1446 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1447 rrd->rra_def[rra_idx].pdp_cnt + 1;
1448 } else {
1449 rra_step_cnt[rra_idx] = 0;
1450 }
1452 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1453 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1454 * so that they will be correct for the next observed value; note that for
1455 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1456 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1457 if (rra_step_cnt[rra_idx] > 1) {
1458 skip_update[rra_idx] = 1;
1459 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1460 elapsed_pdp_st, last_seasonal_coef);
1461 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1462 elapsed_pdp_st + 1, seasonal_coef);
1463 }
1464 /* periodically run a smoother for seasonal effects */
1465 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1466 #ifdef DEBUG
1467 fprintf(stderr,
1468 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1469 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1470 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1471 u_cnt);
1472 #endif
1473 *schedule_smooth = 1;
1474 }
1475 }
1476 if (rrd_test_error())
1477 return -1;
1479 if (update_cdp_prep
1480 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1481 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1482 current_cf) == -1) {
1483 return -1;
1484 }
1485 rra_start +=
1486 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1487 sizeof(rrd_value_t);
1488 }
1489 return 0;
1490 }
1492 /*
1493 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1494 */
1495 static int do_schedule_smooth(
1496 rrd_t *rrd,
1497 unsigned long rra_idx,
1498 unsigned long elapsed_pdp_st)
1499 {
1500 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1501 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1502 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1503 unsigned long seasonal_smooth_idx =
1504 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1505 unsigned long *init_seasonal =
1506 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1508 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1509 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1510 * really an RRA level, not a data source within RRA level parameter, but
1511 * the rra_def is read only for rrd_update (not flushed to disk). */
1512 if (*init_seasonal > BURNIN_CYCLES) {
1513 /* someone has no doubt invented a trick to deal with this wrap around,
1514 * but at least this code is clear. */
1515 if (seasonal_smooth_idx > cur_row) {
1516 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1517 * between PDP and CDP */
1518 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1519 }
1520 /* can't rely on negative numbers because we are working with
1521 * unsigned values */
1522 return (cur_row + elapsed_pdp_st >= row_cnt
1523 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1524 }
1525 /* mark off one of the burn-in cycles */
1526 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1527 }
1529 /*
1530 * For a given RRA, iterate over the data sources and call the appropriate
1531 * consolidation function.
1532 *
1533 * Returns 0 on success, -1 on error.
1534 */
1535 static int update_cdp_prep(
1536 rrd_t *rrd,
1537 unsigned long elapsed_pdp_st,
1538 unsigned long start_pdp_offset,
1539 unsigned long *rra_step_cnt,
1540 int rra_idx,
1541 rrd_value_t *pdp_temp,
1542 rrd_value_t *last_seasonal_coef,
1543 rrd_value_t *seasonal_coef,
1544 int current_cf)
1545 {
1546 unsigned long ds_idx, cdp_idx;
1548 /* update CDP_PREP areas */
1549 /* loop over data soures within each RRA */
1550 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1552 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1554 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1555 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1556 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1557 elapsed_pdp_st, start_pdp_offset,
1558 rrd->rra_def[rra_idx].pdp_cnt,
1559 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1560 rra_idx, ds_idx);
1561 } else {
1562 /* Nothing to consolidate if there's one PDP per CDP. However, if
1563 * we've missed some PDPs, let's update null counters etc. */
1564 if (elapsed_pdp_st > 2) {
1565 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1566 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1567 (enum cf_en)current_cf);
1568 }
1569 }
1571 if (rrd_test_error())
1572 return -1;
1573 } /* endif data sources loop */
1574 return 0;
1575 }
1577 /*
1578 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1579 * primary value, secondary value, and # of unknowns.
1580 */
1581 static void update_cdp(
1582 unival *scratch,
1583 int current_cf,
1584 rrd_value_t pdp_temp_val,
1585 unsigned long rra_step_cnt,
1586 unsigned long elapsed_pdp_st,
1587 unsigned long start_pdp_offset,
1588 unsigned long pdp_cnt,
1589 rrd_value_t xff,
1590 int i,
1591 int ii)
1592 {
1593 /* shorthand variables */
1594 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1595 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1596 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1597 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1599 if (rra_step_cnt) {
1600 /* If we are in this block, as least 1 CDP value will be written to
1601 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1602 * to be written, then the "fill in" value is the CDP_secondary_val
1603 * entry. */
1604 if (isnan(pdp_temp_val)) {
1605 *cdp_unkn_pdp_cnt += start_pdp_offset;
1606 *cdp_secondary_val = DNAN;
1607 } else {
1608 /* CDP_secondary value is the RRA "fill in" value for intermediary
1609 * CDP data entries. No matter the CF, the value is the same because
1610 * the average, max, min, and last of a list of identical values is
1611 * the same, namely, the value itself. */
1612 *cdp_secondary_val = pdp_temp_val;
1613 }
1615 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1616 *cdp_primary_val = DNAN;
1617 } else {
1618 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1619 start_pdp_offset, pdp_cnt);
1620 }
1621 *cdp_val =
1622 initialize_carry_over(pdp_temp_val,current_cf,
1623 elapsed_pdp_st,
1624 start_pdp_offset, pdp_cnt);
1625 /* endif meets xff value requirement for a valid value */
1626 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1627 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1628 if (isnan(pdp_temp_val))
1629 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1630 else
1631 *cdp_unkn_pdp_cnt = 0;
1632 } else { /* rra_step_cnt[i] == 0 */
1634 #ifdef DEBUG
1635 if (isnan(*cdp_val)) {
1636 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1637 i, ii);
1638 } else {
1639 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1640 i, ii, *cdp_val);
1641 }
1642 #endif
1643 if (isnan(pdp_temp_val)) {
1644 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1645 } else {
1646 *cdp_val =
1647 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1648 current_cf, i, ii);
1649 }
1650 }
1651 }
1653 /*
1654 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1655 * on the type of consolidation function.
1656 */
1657 static void initialize_cdp_val(
1658 unival *scratch,
1659 int current_cf,
1660 rrd_value_t pdp_temp_val,
1661 unsigned long start_pdp_offset,
1662 unsigned long pdp_cnt)
1663 {
1664 rrd_value_t cum_val, cur_val;
1666 switch (current_cf) {
1667 case CF_AVERAGE:
1668 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1669 cur_val = IFDNAN(pdp_temp_val, 0.0);
1670 scratch[CDP_primary_val].u_val =
1671 (cum_val + cur_val * start_pdp_offset) /
1672 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1673 break;
1674 case CF_MAXIMUM:
1675 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1676 cur_val = IFDNAN(pdp_temp_val, -DINF);
1678 #if 0
1679 #ifdef DEBUG
1680 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1681 fprintf(stderr,
1682 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1683 i, ii);
1684 exit(-1);
1685 }
1686 #endif
1687 #endif
1688 if (cur_val > cum_val)
1689 scratch[CDP_primary_val].u_val = cur_val;
1690 else
1691 scratch[CDP_primary_val].u_val = cum_val;
1692 break;
1693 case CF_MINIMUM:
1694 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1695 cur_val = IFDNAN(pdp_temp_val, DINF);
1696 #if 0
1697 #ifdef DEBUG
1698 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1699 fprintf(stderr,
1700 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1701 ii);
1702 exit(-1);
1703 }
1704 #endif
1705 #endif
1706 if (cur_val < cum_val)
1707 scratch[CDP_primary_val].u_val = cur_val;
1708 else
1709 scratch[CDP_primary_val].u_val = cum_val;
1710 break;
1711 case CF_LAST:
1712 default:
1713 scratch[CDP_primary_val].u_val = pdp_temp_val;
1714 break;
1715 }
1716 }
1718 /*
1719 * Update the consolidation function for Holt-Winters functions as
1720 * well as other functions that don't actually consolidate multiple
1721 * PDPs.
1722 */
1723 static void reset_cdp(
1724 rrd_t *rrd,
1725 unsigned long elapsed_pdp_st,
1726 rrd_value_t *pdp_temp,
1727 rrd_value_t *last_seasonal_coef,
1728 rrd_value_t *seasonal_coef,
1729 int rra_idx,
1730 int ds_idx,
1731 int cdp_idx,
1732 enum cf_en current_cf)
1733 {
1734 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1736 switch (current_cf) {
1737 case CF_AVERAGE:
1738 default:
1739 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1740 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1741 break;
1742 case CF_SEASONAL:
1743 case CF_DEVSEASONAL:
1744 /* need to update cached seasonal values, so they are consistent
1745 * with the bulk update */
1746 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1747 * CDP_last_deviation are the same. */
1748 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1749 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1750 break;
1751 case CF_HWPREDICT:
1752 case CF_MHWPREDICT:
1753 /* need to update the null_count and last_null_count.
1754 * even do this for non-DNAN pdp_temp because the
1755 * algorithm is not learning from batch updates. */
1756 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1757 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1758 /* fall through */
1759 case CF_DEVPREDICT:
1760 scratch[CDP_primary_val].u_val = DNAN;
1761 scratch[CDP_secondary_val].u_val = DNAN;
1762 break;
1763 case CF_FAILURES:
1764 /* do not count missed bulk values as failures */
1765 scratch[CDP_primary_val].u_val = 0;
1766 scratch[CDP_secondary_val].u_val = 0;
1767 /* need to reset violations buffer.
1768 * could do this more carefully, but for now, just
1769 * assume a bulk update wipes away all violations. */
1770 erase_violations(rrd, cdp_idx, rra_idx);
1771 break;
1772 }
1773 }
1775 static rrd_value_t initialize_carry_over(
1776 rrd_value_t pdp_temp_val,
1777 int current_cf,
1778 unsigned long elapsed_pdp_st,
1779 unsigned long start_pdp_offset,
1780 unsigned long pdp_cnt)
1781 {
1782 unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1783 if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1784 switch (current_cf) {
1785 case CF_MAXIMUM:
1786 return -DINF;
1787 case CF_MINIMUM:
1788 return DINF;
1789 case CF_AVERAGE:
1790 return 0;
1791 default:
1792 return DNAN;
1793 }
1794 }
1795 else {
1796 switch (current_cf) {
1797 case CF_AVERAGE:
1798 return pdp_temp_val * pdp_into_cdp_cnt ;
1799 default:
1800 return pdp_temp_val;
1801 }
1802 }
1803 }
1805 /*
1806 * Update or initialize a CDP value based on the consolidation
1807 * function.
1808 *
1809 * Returns the new value.
1810 */
1811 static rrd_value_t calculate_cdp_val(
1812 rrd_value_t cdp_val,
1813 rrd_value_t pdp_temp_val,
1814 unsigned long elapsed_pdp_st,
1815 int current_cf,
1816 #ifdef DEBUG
1817 int i,
1818 int ii
1819 #else
1820 int UNUSED(i),
1821 int UNUSED(ii)
1822 #endif
1823 )
1824 {
1825 if (isnan(cdp_val)) {
1826 if (current_cf == CF_AVERAGE) {
1827 pdp_temp_val *= elapsed_pdp_st;
1828 }
1829 #ifdef DEBUG
1830 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1831 i, ii, pdp_temp_val);
1832 #endif
1833 return pdp_temp_val;
1834 }
1835 if (current_cf == CF_AVERAGE)
1836 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1837 if (current_cf == CF_MINIMUM)
1838 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1839 if (current_cf == CF_MAXIMUM)
1840 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1842 return pdp_temp_val;
1843 }
1845 /*
1846 * For each RRA, update the seasonal values and then call update_aberrant_CF
1847 * for each data source.
1848 *
1849 * Return 0 on success, -1 on error.
1850 */
1851 static int update_aberrant_cdps(
1852 rrd_t *rrd,
1853 rrd_file_t *rrd_file,
1854 unsigned long rra_begin,
1855 unsigned long elapsed_pdp_st,
1856 rrd_value_t *pdp_temp,
1857 rrd_value_t **seasonal_coef)
1858 {
1859 unsigned long rra_idx, ds_idx, j;
1861 /* number of PDP steps since the last update that
1862 * are assigned to the first CDP to be generated
1863 * since the last update. */
1864 unsigned short scratch_idx;
1865 unsigned long rra_start;
1866 enum cf_en current_cf;
1868 /* this loop is only entered if elapsed_pdp_st < 3 */
1869 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1870 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1871 rra_start = rra_begin;
1872 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1873 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1874 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1875 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1876 if (scratch_idx == CDP_primary_val) {
1877 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1878 elapsed_pdp_st + 1, seasonal_coef);
1879 } else {
1880 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1881 elapsed_pdp_st + 2, seasonal_coef);
1882 }
1883 }
1884 if (rrd_test_error())
1885 return -1;
1886 /* loop over data soures within each RRA */
1887 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1888 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1889 rra_idx * (rrd->stat_head->ds_cnt) +
1890 ds_idx, rra_idx, ds_idx, scratch_idx,
1891 *seasonal_coef);
1892 }
1893 }
1894 rra_start += rrd->rra_def[rra_idx].row_cnt
1895 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1896 }
1897 }
1898 return 0;
1899 }
1901 /*
1902 * Move sequentially through the file, writing one RRA at a time. Note this
1903 * architecture divorces the computation of CDP with flushing updated RRA
1904 * entries to disk.
1905 *
1906 * Return 0 on success, -1 on error.
1907 */
1908 static int write_to_rras(
1909 rrd_t *rrd,
1910 rrd_file_t *rrd_file,
1911 unsigned long *rra_step_cnt,
1912 unsigned long rra_begin,
1913 time_t current_time,
1914 unsigned long *skip_update,
1915 rrd_info_t ** pcdp_summary)
1916 {
1917 unsigned long rra_idx;
1918 unsigned long rra_start;
1919 time_t rra_time = 0; /* time of update for a RRA */
1921 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1923 /* Ready to write to disk */
1924 rra_start = rra_begin;
1926 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1927 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1928 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1930 /* for cdp_prep */
1931 unsigned short scratch_idx;
1932 unsigned long step_subtract;
1934 for (scratch_idx = CDP_primary_val,
1935 step_subtract = 1;
1936 rra_step_cnt[rra_idx] > 0;
1937 rra_step_cnt[rra_idx]--,
1938 scratch_idx = CDP_secondary_val,
1939 step_subtract = 2) {
1941 size_t rra_pos_new;
1942 #ifdef DEBUG
1943 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1944 #endif
1945 /* increment, with wrap-around */
1946 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1947 rra_ptr->cur_row = 0;
1949 /* we know what our position should be */
1950 rra_pos_new = rra_start
1951 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1953 /* re-seek if the position is wrong or we wrapped around */
1954 if ((size_t)rra_pos_new != rrd_file->pos) {
1955 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1956 rrd_set_error("seek error in rrd");
1957 return -1;
1958 }
1959 }
1960 #ifdef DEBUG
1961 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1962 #endif
1964 if (skip_update[rra_idx])
1965 continue;
1967 if (*pcdp_summary != NULL) {
1968 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1970 rra_time = (current_time - current_time % step_time)
1971 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1972 }
1974 if (write_RRA_row
1975 (rrd_file, rrd, rra_idx, scratch_idx,
1976 pcdp_summary, rra_time) == -1)
1977 return -1;
1979 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1980 }
1982 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1983 } /* RRA LOOP */
1985 return 0;
1986 }
1988 /*
1989 * Write out one row of values (one value per DS) to the archive.
1990 *
1991 * Returns 0 on success, -1 on error.
1992 */
1993 static int write_RRA_row(
1994 rrd_file_t *rrd_file,
1995 rrd_t *rrd,
1996 unsigned long rra_idx,
1997 unsigned short CDP_scratch_idx,
1998 rrd_info_t ** pcdp_summary,
1999 time_t rra_time)
2000 {
2001 unsigned long ds_idx, cdp_idx;
2002 rrd_infoval_t iv;
2004 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2005 /* compute the cdp index */
2006 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2007 #ifdef DEBUG
2008 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2009 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2010 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2011 #endif
2012 if (*pcdp_summary != NULL) {
2013 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2014 /* append info to the return hash */
2015 *pcdp_summary = rrd_info_push(*pcdp_summary,
2016 sprintf_alloc
2017 ("[%lli]RRA[%s][%lu]DS[%s]",
2018 (long long)rra_time,
2019 rrd->rra_def[rra_idx].cf_nam,
2020 rrd->rra_def[rra_idx].pdp_cnt,
2021 rrd->ds_def[ds_idx].ds_nam),
2022 RD_I_VAL, iv);
2023 }
2024 errno = 0;
2025 if (rrd_write(rrd_file,
2026 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2027 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2028 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2029 return -1;
2030 }
2031 }
2032 return 0;
2033 }
2035 /*
2036 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2037 *
2038 * Returns 0 on success, -1 otherwise
2039 */
2040 static int smooth_all_rras(
2041 rrd_t *rrd,
2042 rrd_file_t *rrd_file,
2043 unsigned long rra_begin)
2044 {
2045 unsigned long rra_start = rra_begin;
2046 unsigned long rra_idx;
2048 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2049 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2050 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2051 #ifdef DEBUG
2052 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2053 #endif
2054 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2055 if (rrd_test_error())
2056 return -1;
2057 }
2058 rra_start += rrd->rra_def[rra_idx].row_cnt
2059 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2060 }
2061 return 0;
2062 }
2064 #ifndef HAVE_MMAP
2065 /*
2066 * Flush changes to disk (unless we're using mmap)
2067 *
2068 * Returns 0 on success, -1 otherwise
2069 */
2070 static int write_changes_to_disk(
2071 rrd_t *rrd,
2072 rrd_file_t *rrd_file,
2073 int version)
2074 {
2075 /* we just need to write back the live header portion now */
2076 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2077 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2078 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2079 SEEK_SET) != 0) {
2080 rrd_set_error("seek rrd for live header writeback");
2081 return -1;
2082 }
2083 if (version >= 3) {
2084 if (rrd_write(rrd_file, rrd->live_head,
2085 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2086 rrd_set_error("rrd_write live_head to rrd");
2087 return -1;
2088 }
2089 } else {
2090 if (rrd_write(rrd_file, rrd->legacy_last_up,
2091 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2092 rrd_set_error("rrd_write live_head to rrd");
2093 return -1;
2094 }
2095 }
2098 if (rrd_write(rrd_file, rrd->pdp_prep,
2099 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2100 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2101 rrd_set_error("rrd_write pdp_prep to rrd");
2102 return -1;
2103 }
2105 if (rrd_write(rrd_file, rrd->cdp_prep,
2106 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2107 rrd->stat_head->ds_cnt)
2108 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2109 rrd->stat_head->ds_cnt)) {
2111 rrd_set_error("rrd_write cdp_prep to rrd");
2112 return -1;
2113 }
2115 if (rrd_write(rrd_file, rrd->rra_ptr,
2116 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2117 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2118 rrd_set_error("rrd_write rra_ptr to rrd");
2119 return -1;
2120 }
2121 return 0;
2122 }
2123 #endif