1 /*****************************************************************************
2 * RRDtool 1.4.3 Copyright by Tobi Oetiker, 1997-2010
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int rrd_update_r(
67 const char *filename,
68 const char *tmplt,
69 int argc,
70 const char **argv);
71 int _rrd_update(
72 const char *filename,
73 const char *tmplt,
74 int argc,
75 const char **argv,
76 rrd_info_t *);
78 static int allocate_data_structures(
79 rrd_t *rrd,
80 char ***updvals,
81 rrd_value_t **pdp_temp,
82 const char *tmplt,
83 long **tmpl_idx,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
90 rrd_t *rrd,
91 const char *tmplt,
92 unsigned long *tmpl_cnt,
93 long *tmpl_idx);
95 static int process_arg(
96 char *step_start,
97 rrd_t *rrd,
98 rrd_file_t *rrd_file,
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
105 char **updvals,
106 long *tmpl_idx,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
109 int version,
110 unsigned long *skip_update,
111 int *schedule_smooth);
113 static int parse_ds(
114 rrd_t *rrd,
115 char **updvals,
116 long *tmpl_idx,
117 char *input,
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
121 int version);
123 static int get_time_from_reading(
124 rrd_t *rrd,
125 char timesyntax,
126 char **updvals,
127 time_t *current_time,
128 unsigned long *current_time_usec,
129 int version);
131 static int update_pdp_prep(
132 rrd_t *rrd,
133 char **updvals,
134 rrd_value_t *pdp_new,
135 double interval);
137 static int calculate_elapsed_steps(
138 rrd_t *rrd,
139 unsigned long current_time,
140 unsigned long current_time_usec,
141 double interval,
142 double *pre_int,
143 double *post_int,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
147 rrd_t *rrd,
148 double interval,
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152 rrd_t *rrd,
153 double interval,
154 double pre_int,
155 double post_int,
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161 rrd_t *rrd,
162 unsigned long ds_idx,
163 double interval,
164 double pre_int,
165 double post_int,
166 long diff_pdp_st,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171 rrd_t *rrd,
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
219 rrd_t *rrd,
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
224 int rra_idx,
225 int ds_idx,
226 int cdp_idx,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230 rrd_value_t pdp_temp_val,
231 int current_cf,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253 rrd_t *rrd,
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
257 time_t current_time,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
263 rrd_t *rrd,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
267 time_t rra_time);
269 static int smooth_all_rras(
270 rrd_t *rrd,
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276 rrd_t *rrd,
277 rrd_file_t *rrd_file,
278 int version);
279 #endif
281 /*
282 * normalize time as returned by gettimeofday. usec part must
283 * be always >= 0
284 */
285 static void normalize_time(
286 struct timeval *t)
287 {
288 if (t->tv_usec < 0) {
289 t->tv_sec--;
290 t->tv_usec += 1e6L;
291 }
292 }
294 /*
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
297 */
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
301 int version)
302 {
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
308 if (version >= 3) {
309 *current_time_usec = tmp_time.tv_usec;
310 } else {
311 *current_time_usec = 0;
312 }
313 }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318 int argc,
319 char **argv)
320 {
321 char *tmplt = NULL;
322 rrd_info_t *result = NULL;
323 rrd_infoval_t rc;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
327 {0, 0, 0, 0}
328 };
330 rc.u_int = -1;
331 optind = 0;
332 opterr = 0; /* initialize getopt */
334 while (1) {
335 int option_index = 0;
336 int opt;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340 if (opt == EOF)
341 break;
343 switch (opt) {
344 case 't':
345 tmplt = optarg;
346 break;
348 case '?':
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
350 goto end_tag;
351 }
352 }
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
360 goto end_tag;
361 }
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
366 goto end_tag;
367 }
368 rc.u_int = 0;
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
371 argc - optind - 1,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
374 end_tag:
375 return result;
376 }
378 int rrd_update(
379 int argc,
380 char **argv)
381 {
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
385 {0, 0, 0, 0}
386 };
387 int option_index = 0;
388 int opt;
389 char *tmplt = NULL;
390 int rc = -1;
391 char *opt_daemon = NULL;
393 optind = 0;
394 opterr = 0; /* initialize getopt */
396 while (1) {
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399 if (opt == EOF)
400 break;
402 switch (opt) {
403 case 't':
404 tmplt = strdup(optarg);
405 break;
407 case 'd':
408 if (opt_daemon != NULL)
409 free (opt_daemon);
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
412 {
413 rrd_set_error("strdup failed.");
414 goto out;
415 }
416 break;
418 case '?':
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 goto out;
421 }
422 }
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
427 goto out;
428 }
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) return status;
433 }
435 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
436 {
437 rrd_set_error("The caching daemon cannot be used together with "
438 "templates yet.");
439 goto out;
440 }
442 if (! rrdc_is_connected(opt_daemon))
443 {
444 rc = rrd_update_r(argv[optind], tmplt,
445 argc - optind - 1, (const char **) (argv + optind + 1));
446 }
447 else /* we are connected */
448 {
449 rc = rrdc_update (argv[optind], /* file */
450 argc - optind - 1, /* values_num */
451 (const char *const *) (argv + optind + 1)); /* values */
452 if (rc > 0)
453 rrd_set_error("Failed sending the values to rrdcached: %s",
454 rrd_strerror (rc));
455 }
457 out:
458 if (tmplt != NULL)
459 {
460 free(tmplt);
461 tmplt = NULL;
462 }
463 if (opt_daemon != NULL)
464 {
465 free (opt_daemon);
466 opt_daemon = NULL;
467 }
468 return rc;
469 }
471 int rrd_update_r(
472 const char *filename,
473 const char *tmplt,
474 int argc,
475 const char **argv)
476 {
477 return _rrd_update(filename, tmplt, argc, argv, NULL);
478 }
480 int rrd_update_v_r(
481 const char *filename,
482 const char *tmplt,
483 int argc,
484 const char **argv,
485 rrd_info_t * pcdp_summary)
486 {
487 return _rrd_update(filename, tmplt, argc, argv, pcdp_summary);
488 }
490 int _rrd_update(
491 const char *filename,
492 const char *tmplt,
493 int argc,
494 const char **argv,
495 rrd_info_t * pcdp_summary)
496 {
498 int arg_i = 2;
500 unsigned long rra_begin; /* byte pointer to the rra
501 * area in the rrd file. this
502 * pointer never changes value */
503 rrd_value_t *pdp_new; /* prepare the incoming data to be added
504 * to the existing entry */
505 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
506 * to the cdp values */
508 long *tmpl_idx; /* index representing the settings
509 * transported by the tmplt index */
510 unsigned long tmpl_cnt = 2; /* time and data */
511 rrd_t rrd;
512 time_t current_time = 0;
513 unsigned long current_time_usec = 0; /* microseconds part of current time */
514 char **updvals;
515 int schedule_smooth = 0;
517 /* number of elapsed PDP steps since last update */
518 unsigned long *rra_step_cnt = NULL;
520 int version; /* rrd version */
521 rrd_file_t *rrd_file;
522 char *arg_copy; /* for processing the argv */
523 unsigned long *skip_update; /* RRAs to advance but not write */
525 /* need at least 1 arguments: data. */
526 if (argc < 1) {
527 rrd_set_error("Not enough arguments");
528 goto err_out;
529 }
531 rrd_init(&rrd);
532 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
533 goto err_free;
534 }
535 /* We are now at the beginning of the rra's */
536 rra_begin = rrd_file->header_len;
538 version = atoi(rrd.stat_head->version);
540 initialize_time(¤t_time, ¤t_time_usec, version);
542 /* get exclusive lock to whole file.
543 * lock gets removed when we close the file.
544 */
545 if (rrd_lock(rrd_file) != 0) {
546 rrd_set_error("could not lock RRD");
547 goto err_close;
548 }
550 if (allocate_data_structures(&rrd, &updvals,
551 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
552 &rra_step_cnt, &skip_update,
553 &pdp_new) == -1) {
554 goto err_close;
555 }
557 /* loop through the arguments. */
558 for (arg_i = 0; arg_i < argc; arg_i++) {
559 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
560 rrd_set_error("failed duplication argv entry");
561 break;
562 }
563 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
564 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
565 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
566 &pcdp_summary, version, skip_update,
567 &schedule_smooth) == -1) {
568 if (rrd_test_error()) { /* Should have error string always here */
569 char *save_error;
571 /* Prepend file name to error message */
572 if ((save_error = strdup(rrd_get_error())) != NULL) {
573 rrd_set_error("%s: %s", filename, save_error);
574 free(save_error);
575 }
576 }
577 free(arg_copy);
578 break;
579 }
580 free(arg_copy);
581 }
583 free(rra_step_cnt);
585 /* if we got here and if there is an error and if the file has not been
586 * written to, then close things up and return. */
587 if (rrd_test_error()) {
588 goto err_free_structures;
589 }
590 #ifndef HAVE_MMAP
591 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
592 goto err_free_structures;
593 }
594 #endif
596 /* calling the smoothing code here guarantees at most one smoothing
597 * operation per rrd_update call. Unfortunately, it is possible with bulk
598 * updates, or a long-delayed update for smoothing to occur off-schedule.
599 * This really isn't critical except during the burn-in cycles. */
600 if (schedule_smooth) {
601 smooth_all_rras(&rrd, rrd_file, rra_begin);
602 }
604 /* rrd_dontneed(rrd_file,&rrd); */
605 rrd_free(&rrd);
606 rrd_close(rrd_file);
608 free(pdp_new);
609 free(tmpl_idx);
610 free(pdp_temp);
611 free(skip_update);
612 free(updvals);
613 return 0;
615 err_free_structures:
616 free(pdp_new);
617 free(tmpl_idx);
618 free(pdp_temp);
619 free(skip_update);
620 free(updvals);
621 err_close:
622 rrd_close(rrd_file);
623 err_free:
624 rrd_free(&rrd);
625 err_out:
626 return -1;
627 }
629 /*
630 * Allocate some important arrays used, and initialize the template.
631 *
632 * When it returns, either all of the structures are allocated
633 * or none of them are.
634 *
635 * Returns 0 on success, -1 on error.
636 */
637 static int allocate_data_structures(
638 rrd_t *rrd,
639 char ***updvals,
640 rrd_value_t **pdp_temp,
641 const char *tmplt,
642 long **tmpl_idx,
643 unsigned long *tmpl_cnt,
644 unsigned long **rra_step_cnt,
645 unsigned long **skip_update,
646 rrd_value_t **pdp_new)
647 {
648 unsigned i, ii;
649 if ((*updvals = (char **) malloc(sizeof(char *)
650 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
651 rrd_set_error("allocating updvals pointer array.");
652 return -1;
653 }
654 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
655 * rrd->stat_head->ds_cnt)) ==
656 NULL) {
657 rrd_set_error("allocating pdp_temp.");
658 goto err_free_updvals;
659 }
660 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
661 *
662 rrd->stat_head->rra_cnt)) ==
663 NULL) {
664 rrd_set_error("allocating skip_update.");
665 goto err_free_pdp_temp;
666 }
667 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
668 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
669 rrd_set_error("allocating tmpl_idx.");
670 goto err_free_skip_update;
671 }
672 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
673 *
674 (rrd->stat_head->
675 rra_cnt))) == NULL) {
676 rrd_set_error("allocating rra_step_cnt.");
677 goto err_free_tmpl_idx;
678 }
680 /* initialize tmplt redirector */
681 /* default config example (assume DS 1 is a CDEF DS)
682 tmpl_idx[0] -> 0; (time)
683 tmpl_idx[1] -> 1; (DS 0)
684 tmpl_idx[2] -> 3; (DS 2)
685 tmpl_idx[3] -> 4; (DS 3) */
686 (*tmpl_idx)[0] = 0; /* time */
687 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
688 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
689 (*tmpl_idx)[ii++] = i;
690 }
691 *tmpl_cnt = ii;
693 if (tmplt != NULL) {
694 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
695 goto err_free_rra_step_cnt;
696 }
697 }
699 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700 * rrd->stat_head->ds_cnt)) == NULL) {
701 rrd_set_error("allocating pdp_new.");
702 goto err_free_rra_step_cnt;
703 }
705 return 0;
707 err_free_rra_step_cnt:
708 free(*rra_step_cnt);
709 err_free_tmpl_idx:
710 free(*tmpl_idx);
711 err_free_skip_update:
712 free(*skip_update);
713 err_free_pdp_temp:
714 free(*pdp_temp);
715 err_free_updvals:
716 free(*updvals);
717 return -1;
718 }
720 /*
721 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
722 *
723 * Returns 0 on success.
724 */
725 static int parse_template(
726 rrd_t *rrd,
727 const char *tmplt,
728 unsigned long *tmpl_cnt,
729 long *tmpl_idx)
730 {
731 char *dsname, *tmplt_copy;
732 unsigned int tmpl_len, i;
733 int ret = 0;
735 *tmpl_cnt = 1; /* the first entry is the time */
737 /* we should work on a writeable copy here */
738 if ((tmplt_copy = strdup(tmplt)) == NULL) {
739 rrd_set_error("error copying tmplt '%s'", tmplt);
740 ret = -1;
741 goto out;
742 }
744 dsname = tmplt_copy;
745 tmpl_len = strlen(tmplt_copy);
746 for (i = 0; i <= tmpl_len; i++) {
747 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
748 tmplt_copy[i] = '\0';
749 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
750 rrd_set_error("tmplt contains more DS definitions than RRD");
751 ret = -1;
752 goto out_free_tmpl_copy;
753 }
754 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
755 rrd_set_error("unknown DS name '%s'", dsname);
756 ret = -1;
757 goto out_free_tmpl_copy;
758 }
759 /* go to the next entry on the tmplt_copy */
760 if (i < tmpl_len)
761 dsname = &tmplt_copy[i + 1];
762 }
763 }
764 out_free_tmpl_copy:
765 free(tmplt_copy);
766 out:
767 return ret;
768 }
770 /*
771 * Parse an update string, updates the primary data points (PDPs)
772 * and consolidated data points (CDPs), and writes changes to the RRAs.
773 *
774 * Returns 0 on success, -1 on error.
775 */
776 static int process_arg(
777 char *step_start,
778 rrd_t *rrd,
779 rrd_file_t *rrd_file,
780 unsigned long rra_begin,
781 time_t *current_time,
782 unsigned long *current_time_usec,
783 rrd_value_t *pdp_temp,
784 rrd_value_t *pdp_new,
785 unsigned long *rra_step_cnt,
786 char **updvals,
787 long *tmpl_idx,
788 unsigned long tmpl_cnt,
789 rrd_info_t ** pcdp_summary,
790 int version,
791 unsigned long *skip_update,
792 int *schedule_smooth)
793 {
794 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
796 /* a vector of future Holt-Winters seasonal coefs */
797 unsigned long elapsed_pdp_st;
799 double interval, pre_int, post_int; /* interval between this and
800 * the last run */
801 unsigned long proc_pdp_cnt;
803 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
804 current_time, current_time_usec, version) == -1) {
805 return -1;
806 }
808 interval = (double) (*current_time - rrd->live_head->last_up)
809 + (double) ((long) *current_time_usec -
810 (long) rrd->live_head->last_up_usec) / 1e6f;
812 /* process the data sources and update the pdp_prep
813 * area accordingly */
814 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
815 return -1;
816 }
818 elapsed_pdp_st = calculate_elapsed_steps(rrd,
819 *current_time,
820 *current_time_usec, interval,
821 &pre_int, &post_int,
822 &proc_pdp_cnt);
824 /* has a pdp_st moment occurred since the last run ? */
825 if (elapsed_pdp_st == 0) {
826 /* no we have not passed a pdp_st moment. therefore update is simple */
827 simple_update(rrd, interval, pdp_new);
828 } else {
829 /* an pdp_st has occurred. */
830 if (process_all_pdp_st(rrd, interval,
831 pre_int, post_int,
832 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
833 return -1;
834 }
835 if (update_all_cdp_prep(rrd, rra_step_cnt,
836 rra_begin, rrd_file,
837 elapsed_pdp_st,
838 proc_pdp_cnt,
839 &last_seasonal_coef,
840 &seasonal_coef,
841 pdp_temp,
842 skip_update, schedule_smooth) == -1) {
843 goto err_free_coefficients;
844 }
845 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
846 elapsed_pdp_st, pdp_temp,
847 &seasonal_coef) == -1) {
848 goto err_free_coefficients;
849 }
850 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
851 *current_time, skip_update,
852 pcdp_summary) == -1) {
853 goto err_free_coefficients;
854 }
855 } /* endif a pdp_st has occurred */
856 rrd->live_head->last_up = *current_time;
857 rrd->live_head->last_up_usec = *current_time_usec;
859 if (version < 3) {
860 *rrd->legacy_last_up = rrd->live_head->last_up;
861 }
862 free(seasonal_coef);
863 free(last_seasonal_coef);
864 return 0;
866 err_free_coefficients:
867 free(seasonal_coef);
868 free(last_seasonal_coef);
869 return -1;
870 }
872 /*
873 * Parse a DS string (time + colon-separated values), storing the
874 * results in current_time, current_time_usec, and updvals.
875 *
876 * Returns 0 on success, -1 on error.
877 */
878 static int parse_ds(
879 rrd_t *rrd,
880 char **updvals,
881 long *tmpl_idx,
882 char *input,
883 unsigned long tmpl_cnt,
884 time_t *current_time,
885 unsigned long *current_time_usec,
886 int version)
887 {
888 char *p;
889 unsigned long i;
890 char timesyntax;
892 updvals[0] = input;
893 /* initialize all ds input to unknown except the first one
894 which has always got to be set */
895 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
896 updvals[i] = "U";
898 /* separate all ds elements; first must be examined separately
899 due to alternate time syntax */
900 if ((p = strchr(input, '@')) != NULL) {
901 timesyntax = '@';
902 } else if ((p = strchr(input, ':')) != NULL) {
903 timesyntax = ':';
904 } else {
905 rrd_set_error("expected timestamp not found in data source from %s",
906 input);
907 return -1;
908 }
909 *p = '\0';
910 i = 1;
911 updvals[tmpl_idx[i++]] = p + 1;
912 while (*(++p)) {
913 if (*p == ':') {
914 *p = '\0';
915 if (i < tmpl_cnt) {
916 updvals[tmpl_idx[i++]] = p + 1;
917 }
918 else {
919 rrd_set_error("found extra data on update argument: %s",p+1);
920 return -1;
921 }
922 }
923 }
925 if (i != tmpl_cnt) {
926 rrd_set_error("expected %lu data source readings (got %lu) from %s",
927 tmpl_cnt - 1, i - 1, input);
928 return -1;
929 }
931 if (get_time_from_reading(rrd, timesyntax, updvals,
932 current_time, current_time_usec,
933 version) == -1) {
934 return -1;
935 }
936 return 0;
937 }
939 /*
940 * Parse the time in a DS string, store it in current_time and
941 * current_time_usec and verify that it's later than the last
942 * update for this DS.
943 *
944 * Returns 0 on success, -1 on error.
945 */
946 static int get_time_from_reading(
947 rrd_t *rrd,
948 char timesyntax,
949 char **updvals,
950 time_t *current_time,
951 unsigned long *current_time_usec,
952 int version)
953 {
954 double tmp;
955 char *parsetime_error = NULL;
956 char *old_locale;
957 rrd_time_value_t ds_tv;
958 struct timeval tmp_time; /* used for time conversion */
960 /* get the time from the reading ... handle N */
961 if (timesyntax == '@') { /* at-style */
962 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
963 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
964 return -1;
965 }
966 if (ds_tv.type == RELATIVE_TO_END_TIME ||
967 ds_tv.type == RELATIVE_TO_START_TIME) {
968 rrd_set_error("specifying time relative to the 'start' "
969 "or 'end' makes no sense here: %s", updvals[0]);
970 return -1;
971 }
972 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
973 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
974 } else if (strcmp(updvals[0], "N") == 0) {
975 gettimeofday(&tmp_time, 0);
976 normalize_time(&tmp_time);
977 *current_time = tmp_time.tv_sec;
978 *current_time_usec = tmp_time.tv_usec;
979 } else {
980 old_locale = setlocale(LC_NUMERIC, NULL);
981 setlocale(LC_NUMERIC, "C");
982 errno = 0;
983 tmp = strtod(updvals[0], 0);
984 if (errno > 0) {
985 rrd_set_error("converting '%s' to float: %s",
986 updvals[0], rrd_strerror(errno));
987 return -1;
988 };
989 setlocale(LC_NUMERIC, old_locale);
990 if (tmp < 0.0){
991 gettimeofday(&tmp_time, 0);
992 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
993 }
995 *current_time = floor(tmp);
996 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
997 }
998 /* dont do any correction for old version RRDs */
999 if (version < 3)
1000 *current_time_usec = 0;
1002 if (*current_time < rrd->live_head->last_up ||
1003 (*current_time == rrd->live_head->last_up &&
1004 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1005 rrd_set_error("illegal attempt to update using time %ld when "
1006 "last update time is %ld (minimum one second step)",
1007 *current_time, rrd->live_head->last_up);
1008 return -1;
1009 }
1010 return 0;
1011 }
1013 /*
1014 * Update pdp_new by interpreting the updvals according to the DS type
1015 * (COUNTER, GAUGE, etc.).
1016 *
1017 * Returns 0 on success, -1 on error.
1018 */
1019 static int update_pdp_prep(
1020 rrd_t *rrd,
1021 char **updvals,
1022 rrd_value_t *pdp_new,
1023 double interval)
1024 {
1025 unsigned long ds_idx;
1026 int ii;
1027 char *endptr; /* used in the conversion */
1028 double rate;
1029 char *old_locale;
1030 enum dst_en dst_idx;
1032 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1033 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1035 /* make sure we do not build diffs with old last_ds values */
1036 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1037 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1038 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1039 }
1041 /* NOTE: DST_CDEF should never enter this if block, because
1042 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1043 * accidently specified a value for the DST_CDEF. To handle this case,
1044 * an extra check is required. */
1046 if ((updvals[ds_idx + 1][0] != 'U') &&
1047 (dst_idx != DST_CDEF) &&
1048 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1049 rate = DNAN;
1051 /* pdp_new contains rate * time ... eg the bytes transferred during
1052 * the interval. Doing it this way saves a lot of math operations
1053 */
1054 switch (dst_idx) {
1055 case DST_COUNTER:
1056 case DST_DERIVE:
1057 /* Check if this is a valid integer. `U' is already handled in
1058 * another branch. */
1059 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1060 if ((ii == 0) && (dst_idx == DST_DERIVE)
1061 && (updvals[ds_idx + 1][ii] == '-'))
1062 continue;
1064 if ((updvals[ds_idx + 1][ii] < '0')
1065 || (updvals[ds_idx + 1][ii] > '9')) {
1066 rrd_set_error("not a simple %s integer: '%s'",
1067 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1068 updvals[ds_idx + 1]);
1069 return -1;
1070 }
1071 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1073 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1074 pdp_new[ds_idx] =
1075 rrd_diff(updvals[ds_idx + 1],
1076 rrd->pdp_prep[ds_idx].last_ds);
1077 if (dst_idx == DST_COUNTER) {
1078 /* simple overflow catcher. This will fail
1079 * terribly for non 32 or 64 bit counters
1080 * ... are there any others in SNMP land?
1081 */
1082 if (pdp_new[ds_idx] < (double) 0.0)
1083 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1084 if (pdp_new[ds_idx] < (double) 0.0)
1085 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1086 }
1087 rate = pdp_new[ds_idx] / interval;
1088 } else {
1089 pdp_new[ds_idx] = DNAN;
1090 }
1091 break;
1092 case DST_ABSOLUTE:
1093 old_locale = setlocale(LC_NUMERIC, NULL);
1094 setlocale(LC_NUMERIC, "C");
1095 errno = 0;
1096 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1097 if (errno > 0) {
1098 rrd_set_error("converting '%s' to float: %s",
1099 updvals[ds_idx + 1], rrd_strerror(errno));
1100 return -1;
1101 };
1102 setlocale(LC_NUMERIC, old_locale);
1103 if (endptr[0] != '\0') {
1104 rrd_set_error
1105 ("conversion of '%s' to float not complete: tail '%s'",
1106 updvals[ds_idx + 1], endptr);
1107 return -1;
1108 }
1109 rate = pdp_new[ds_idx] / interval;
1110 break;
1111 case DST_GAUGE:
1112 old_locale = setlocale(LC_NUMERIC, NULL);
1113 setlocale(LC_NUMERIC, "C");
1114 errno = 0;
1115 pdp_new[ds_idx] =
1116 strtod(updvals[ds_idx + 1], &endptr) * interval;
1117 if (errno) {
1118 rrd_set_error("converting '%s' to float: %s",
1119 updvals[ds_idx + 1], rrd_strerror(errno));
1120 return -1;
1121 };
1122 setlocale(LC_NUMERIC, old_locale);
1123 if (endptr[0] != '\0') {
1124 rrd_set_error
1125 ("conversion of '%s' to float not complete: tail '%s'",
1126 updvals[ds_idx + 1], endptr);
1127 return -1;
1128 }
1129 rate = pdp_new[ds_idx] / interval;
1130 break;
1131 default:
1132 rrd_set_error("rrd contains unknown DS type : '%s'",
1133 rrd->ds_def[ds_idx].dst);
1134 return -1;
1135 }
1136 /* break out of this for loop if the error string is set */
1137 if (rrd_test_error()) {
1138 return -1;
1139 }
1140 /* make sure pdp_temp is neither too large or too small
1141 * if any of these occur it becomes unknown ...
1142 * sorry folks ... */
1143 if (!isnan(rate) &&
1144 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1145 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1146 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1147 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1148 pdp_new[ds_idx] = DNAN;
1149 }
1150 } else {
1151 /* no news is news all the same */
1152 pdp_new[ds_idx] = DNAN;
1153 }
1156 /* make a copy of the command line argument for the next run */
1157 #ifdef DEBUG
1158 fprintf(stderr, "prep ds[%lu]\t"
1159 "last_arg '%s'\t"
1160 "this_arg '%s'\t"
1161 "pdp_new %10.2f\n",
1162 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1163 pdp_new[ds_idx]);
1164 #endif
1165 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1166 LAST_DS_LEN - 1);
1167 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1168 }
1169 return 0;
1170 }
1172 /*
1173 * How many PDP steps have elapsed since the last update? Returns the answer,
1174 * and stores the time between the last update and the last PDP in pre_time,
1175 * and the time between the last PDP and the current time in post_int.
1176 */
1177 static int calculate_elapsed_steps(
1178 rrd_t *rrd,
1179 unsigned long current_time,
1180 unsigned long current_time_usec,
1181 double interval,
1182 double *pre_int,
1183 double *post_int,
1184 unsigned long *proc_pdp_cnt)
1185 {
1186 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1187 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1188 * time */
1189 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1190 * when it was last updated */
1191 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1193 /* when was the current pdp started */
1194 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1195 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1197 /* when did the last pdp_st occur */
1198 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1199 occu_pdp_st = current_time - occu_pdp_age;
1201 if (occu_pdp_st > proc_pdp_st) {
1202 /* OK we passed the pdp_st moment */
1203 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1204 * occurred before the latest
1205 * pdp_st moment*/
1206 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1207 *post_int = occu_pdp_age; /* how much after it */
1208 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1209 } else {
1210 *pre_int = interval;
1211 *post_int = 0;
1212 }
1214 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1216 #ifdef DEBUG
1217 printf("proc_pdp_age %lu\t"
1218 "proc_pdp_st %lu\t"
1219 "occu_pfp_age %lu\t"
1220 "occu_pdp_st %lu\t"
1221 "int %lf\t"
1222 "pre_int %lf\t"
1223 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1224 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1225 #endif
1227 /* compute the number of elapsed pdp_st moments */
1228 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1229 }
1231 /*
1232 * Increment the PDP values by the values in pdp_new, or else initialize them.
1233 */
1234 static void simple_update(
1235 rrd_t *rrd,
1236 double interval,
1237 rrd_value_t *pdp_new)
1238 {
1239 int i;
1241 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1242 if (isnan(pdp_new[i])) {
1243 /* this is not really accurate if we use subsecond data arrival time
1244 should have thought of it when going subsecond resolution ...
1245 sorry next format change we will have it! */
1246 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1247 floor(interval);
1248 } else {
1249 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1250 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1251 } else {
1252 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1253 }
1254 }
1255 #ifdef DEBUG
1256 fprintf(stderr,
1257 "NO PDP ds[%i]\t"
1258 "value %10.2f\t"
1259 "unkn_sec %5lu\n",
1260 i,
1261 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1262 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1263 #endif
1264 }
1265 }
1267 /*
1268 * Call process_pdp_st for each DS.
1269 *
1270 * Returns 0 on success, -1 on error.
1271 */
1272 static int process_all_pdp_st(
1273 rrd_t *rrd,
1274 double interval,
1275 double pre_int,
1276 double post_int,
1277 unsigned long elapsed_pdp_st,
1278 rrd_value_t *pdp_new,
1279 rrd_value_t *pdp_temp)
1280 {
1281 unsigned long ds_idx;
1283 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1284 rate*seconds which occurred up to the last run.
1285 pdp_new[] contains rate*seconds from the latest run.
1286 pdp_temp[] will contain the rate for cdp */
1288 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1289 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1290 elapsed_pdp_st * rrd->stat_head->pdp_step,
1291 pdp_new, pdp_temp) == -1) {
1292 return -1;
1293 }
1294 #ifdef DEBUG
1295 fprintf(stderr, "PDP UPD ds[%lu]\t"
1296 "elapsed_pdp_st %lu\t"
1297 "pdp_temp %10.2f\t"
1298 "new_prep %10.2f\t"
1299 "new_unkn_sec %5lu\n",
1300 ds_idx,
1301 elapsed_pdp_st,
1302 pdp_temp[ds_idx],
1303 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1304 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1305 #endif
1306 }
1307 return 0;
1308 }
1310 /*
1311 * Process an update that occurs after one of the PDP moments.
1312 * Increments the PDP value, sets NAN if time greater than the
1313 * heartbeats have elapsed, processes CDEFs.
1314 *
1315 * Returns 0 on success, -1 on error.
1316 */
1317 static int process_pdp_st(
1318 rrd_t *rrd,
1319 unsigned long ds_idx,
1320 double interval,
1321 double pre_int,
1322 double post_int,
1323 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1324 rrd_value_t *pdp_new,
1325 rrd_value_t *pdp_temp)
1326 {
1327 int i;
1329 /* update pdp_prep to the current pdp_st. */
1330 double pre_unknown = 0.0;
1331 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1332 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1334 rpnstack_t rpnstack; /* used for COMPUTE DS */
1336 rpnstack_init(&rpnstack);
1339 if (isnan(pdp_new[ds_idx])) {
1340 /* a final bit of unknown to be added before calculation
1341 we use a temporary variable for this so that we
1342 don't have to turn integer lines before using the value */
1343 pre_unknown = pre_int;
1344 } else {
1345 if (isnan(scratch[PDP_val].u_val)) {
1346 scratch[PDP_val].u_val = 0;
1347 }
1348 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1349 }
1351 /* if too much of the pdp_prep is unknown we dump it */
1352 /* if the interval is larger thatn mrhb we get NAN */
1353 if ((interval > mrhb) ||
1354 (rrd->stat_head->pdp_step / 2.0 <
1355 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1356 pdp_temp[ds_idx] = DNAN;
1357 } else {
1358 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1359 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1360 pre_unknown);
1361 }
1363 /* process CDEF data sources; remember each CDEF DS can
1364 * only reference other DS with a lower index number */
1365 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1366 rpnp_t *rpnp;
1368 rpnp =
1369 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1370 if(rpnp == NULL) {
1371 rpnstack_free(&rpnstack);
1372 return -1;
1373 }
1374 /* substitute data values for OP_VARIABLE nodes */
1375 for (i = 0; rpnp[i].op != OP_END; i++) {
1376 if (rpnp[i].op == OP_VARIABLE) {
1377 rpnp[i].op = OP_NUMBER;
1378 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1379 }
1380 }
1381 /* run the rpn calculator */
1382 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1383 free(rpnp);
1384 rpnstack_free(&rpnstack);
1385 return -1;
1386 }
1387 free(rpnp);
1388 }
1390 /* make pdp_prep ready for the next run */
1391 if (isnan(pdp_new[ds_idx])) {
1392 /* this is not realy accurate if we use subsecond data arival time
1393 should have thought of it when going subsecond resolution ...
1394 sorry next format change we will have it! */
1395 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1396 scratch[PDP_val].u_val = DNAN;
1397 } else {
1398 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1399 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1400 }
1401 rpnstack_free(&rpnstack);
1402 return 0;
1403 }
1405 /*
1406 * Iterate over all the RRAs for a given DS and:
1407 * 1. Decide whether to schedule a smooth later
1408 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1409 * 3. Update the CDP
1410 *
1411 * Returns 0 on success, -1 on error
1412 */
1413 static int update_all_cdp_prep(
1414 rrd_t *rrd,
1415 unsigned long *rra_step_cnt,
1416 unsigned long rra_begin,
1417 rrd_file_t *rrd_file,
1418 unsigned long elapsed_pdp_st,
1419 unsigned long proc_pdp_cnt,
1420 rrd_value_t **last_seasonal_coef,
1421 rrd_value_t **seasonal_coef,
1422 rrd_value_t *pdp_temp,
1423 unsigned long *skip_update,
1424 int *schedule_smooth)
1425 {
1426 unsigned long rra_idx;
1428 /* index into the CDP scratch array */
1429 enum cf_en current_cf;
1430 unsigned long rra_start;
1432 /* number of rows to be updated in an RRA for a data value. */
1433 unsigned long start_pdp_offset;
1435 rra_start = rra_begin;
1436 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1437 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1438 start_pdp_offset =
1439 rrd->rra_def[rra_idx].pdp_cnt -
1440 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1441 skip_update[rra_idx] = 0;
1442 if (start_pdp_offset <= elapsed_pdp_st) {
1443 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1444 rrd->rra_def[rra_idx].pdp_cnt + 1;
1445 } else {
1446 rra_step_cnt[rra_idx] = 0;
1447 }
1449 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1450 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1451 * so that they will be correct for the next observed value; note that for
1452 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1453 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1454 if (rra_step_cnt[rra_idx] > 1) {
1455 skip_update[rra_idx] = 1;
1456 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1457 elapsed_pdp_st, last_seasonal_coef);
1458 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1459 elapsed_pdp_st + 1, seasonal_coef);
1460 }
1461 /* periodically run a smoother for seasonal effects */
1462 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1463 #ifdef DEBUG
1464 fprintf(stderr,
1465 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1466 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1467 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1468 u_cnt);
1469 #endif
1470 *schedule_smooth = 1;
1471 }
1472 }
1473 if (rrd_test_error())
1474 return -1;
1476 if (update_cdp_prep
1477 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1478 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1479 current_cf) == -1) {
1480 return -1;
1481 }
1482 rra_start +=
1483 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1484 sizeof(rrd_value_t);
1485 }
1486 return 0;
1487 }
1489 /*
1490 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1491 */
1492 static int do_schedule_smooth(
1493 rrd_t *rrd,
1494 unsigned long rra_idx,
1495 unsigned long elapsed_pdp_st)
1496 {
1497 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1498 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1499 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1500 unsigned long seasonal_smooth_idx =
1501 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1502 unsigned long *init_seasonal =
1503 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1505 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1506 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1507 * really an RRA level, not a data source within RRA level parameter, but
1508 * the rra_def is read only for rrd_update (not flushed to disk). */
1509 if (*init_seasonal > BURNIN_CYCLES) {
1510 /* someone has no doubt invented a trick to deal with this wrap around,
1511 * but at least this code is clear. */
1512 if (seasonal_smooth_idx > cur_row) {
1513 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1514 * between PDP and CDP */
1515 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1516 }
1517 /* can't rely on negative numbers because we are working with
1518 * unsigned values */
1519 return (cur_row + elapsed_pdp_st >= row_cnt
1520 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1521 }
1522 /* mark off one of the burn-in cycles */
1523 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1524 }
1526 /*
1527 * For a given RRA, iterate over the data sources and call the appropriate
1528 * consolidation function.
1529 *
1530 * Returns 0 on success, -1 on error.
1531 */
1532 static int update_cdp_prep(
1533 rrd_t *rrd,
1534 unsigned long elapsed_pdp_st,
1535 unsigned long start_pdp_offset,
1536 unsigned long *rra_step_cnt,
1537 int rra_idx,
1538 rrd_value_t *pdp_temp,
1539 rrd_value_t *last_seasonal_coef,
1540 rrd_value_t *seasonal_coef,
1541 int current_cf)
1542 {
1543 unsigned long ds_idx, cdp_idx;
1545 /* update CDP_PREP areas */
1546 /* loop over data soures within each RRA */
1547 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1549 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1551 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1552 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1553 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1554 elapsed_pdp_st, start_pdp_offset,
1555 rrd->rra_def[rra_idx].pdp_cnt,
1556 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1557 rra_idx, ds_idx);
1558 } else {
1559 /* Nothing to consolidate if there's one PDP per CDP. However, if
1560 * we've missed some PDPs, let's update null counters etc. */
1561 if (elapsed_pdp_st > 2) {
1562 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1563 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1564 (enum cf_en)current_cf);
1565 }
1566 }
1568 if (rrd_test_error())
1569 return -1;
1570 } /* endif data sources loop */
1571 return 0;
1572 }
1574 /*
1575 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1576 * primary value, secondary value, and # of unknowns.
1577 */
1578 static void update_cdp(
1579 unival *scratch,
1580 int current_cf,
1581 rrd_value_t pdp_temp_val,
1582 unsigned long rra_step_cnt,
1583 unsigned long elapsed_pdp_st,
1584 unsigned long start_pdp_offset,
1585 unsigned long pdp_cnt,
1586 rrd_value_t xff,
1587 int i,
1588 int ii)
1589 {
1590 /* shorthand variables */
1591 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1592 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1593 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1594 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1596 if (rra_step_cnt) {
1597 /* If we are in this block, as least 1 CDP value will be written to
1598 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1599 * to be written, then the "fill in" value is the CDP_secondary_val
1600 * entry. */
1601 if (isnan(pdp_temp_val)) {
1602 *cdp_unkn_pdp_cnt += start_pdp_offset;
1603 *cdp_secondary_val = DNAN;
1604 } else {
1605 /* CDP_secondary value is the RRA "fill in" value for intermediary
1606 * CDP data entries. No matter the CF, the value is the same because
1607 * the average, max, min, and last of a list of identical values is
1608 * the same, namely, the value itself. */
1609 *cdp_secondary_val = pdp_temp_val;
1610 }
1612 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1613 *cdp_primary_val = DNAN;
1614 } else {
1615 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1616 start_pdp_offset, pdp_cnt);
1617 }
1618 *cdp_val =
1619 initialize_carry_over(pdp_temp_val,current_cf,
1620 elapsed_pdp_st,
1621 start_pdp_offset, pdp_cnt);
1622 /* endif meets xff value requirement for a valid value */
1623 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1624 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1625 if (isnan(pdp_temp_val))
1626 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1627 else
1628 *cdp_unkn_pdp_cnt = 0;
1629 } else { /* rra_step_cnt[i] == 0 */
1631 #ifdef DEBUG
1632 if (isnan(*cdp_val)) {
1633 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1634 i, ii);
1635 } else {
1636 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1637 i, ii, *cdp_val);
1638 }
1639 #endif
1640 if (isnan(pdp_temp_val)) {
1641 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1642 } else {
1643 *cdp_val =
1644 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1645 current_cf, i, ii);
1646 }
1647 }
1648 }
1650 /*
1651 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1652 * on the type of consolidation function.
1653 */
1654 static void initialize_cdp_val(
1655 unival *scratch,
1656 int current_cf,
1657 rrd_value_t pdp_temp_val,
1658 unsigned long start_pdp_offset,
1659 unsigned long pdp_cnt)
1660 {
1661 rrd_value_t cum_val, cur_val;
1663 switch (current_cf) {
1664 case CF_AVERAGE:
1665 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1666 cur_val = IFDNAN(pdp_temp_val, 0.0);
1667 scratch[CDP_primary_val].u_val =
1668 (cum_val + cur_val * start_pdp_offset) /
1669 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1670 break;
1671 case CF_MAXIMUM:
1672 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1673 cur_val = IFDNAN(pdp_temp_val, -DINF);
1675 #if 0
1676 #ifdef DEBUG
1677 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1678 fprintf(stderr,
1679 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1680 i, ii);
1681 exit(-1);
1682 }
1683 #endif
1684 #endif
1685 if (cur_val > cum_val)
1686 scratch[CDP_primary_val].u_val = cur_val;
1687 else
1688 scratch[CDP_primary_val].u_val = cum_val;
1689 break;
1690 case CF_MINIMUM:
1691 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1692 cur_val = IFDNAN(pdp_temp_val, DINF);
1693 #if 0
1694 #ifdef DEBUG
1695 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1696 fprintf(stderr,
1697 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1698 ii);
1699 exit(-1);
1700 }
1701 #endif
1702 #endif
1703 if (cur_val < cum_val)
1704 scratch[CDP_primary_val].u_val = cur_val;
1705 else
1706 scratch[CDP_primary_val].u_val = cum_val;
1707 break;
1708 case CF_LAST:
1709 default:
1710 scratch[CDP_primary_val].u_val = pdp_temp_val;
1711 break;
1712 }
1713 }
1715 /*
1716 * Update the consolidation function for Holt-Winters functions as
1717 * well as other functions that don't actually consolidate multiple
1718 * PDPs.
1719 */
1720 static void reset_cdp(
1721 rrd_t *rrd,
1722 unsigned long elapsed_pdp_st,
1723 rrd_value_t *pdp_temp,
1724 rrd_value_t *last_seasonal_coef,
1725 rrd_value_t *seasonal_coef,
1726 int rra_idx,
1727 int ds_idx,
1728 int cdp_idx,
1729 enum cf_en current_cf)
1730 {
1731 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1733 switch (current_cf) {
1734 case CF_AVERAGE:
1735 default:
1736 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1737 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1738 break;
1739 case CF_SEASONAL:
1740 case CF_DEVSEASONAL:
1741 /* need to update cached seasonal values, so they are consistent
1742 * with the bulk update */
1743 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1744 * CDP_last_deviation are the same. */
1745 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1746 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1747 break;
1748 case CF_HWPREDICT:
1749 case CF_MHWPREDICT:
1750 /* need to update the null_count and last_null_count.
1751 * even do this for non-DNAN pdp_temp because the
1752 * algorithm is not learning from batch updates. */
1753 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1754 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1755 /* fall through */
1756 case CF_DEVPREDICT:
1757 scratch[CDP_primary_val].u_val = DNAN;
1758 scratch[CDP_secondary_val].u_val = DNAN;
1759 break;
1760 case CF_FAILURES:
1761 /* do not count missed bulk values as failures */
1762 scratch[CDP_primary_val].u_val = 0;
1763 scratch[CDP_secondary_val].u_val = 0;
1764 /* need to reset violations buffer.
1765 * could do this more carefully, but for now, just
1766 * assume a bulk update wipes away all violations. */
1767 erase_violations(rrd, cdp_idx, rra_idx);
1768 break;
1769 }
1770 }
1772 static rrd_value_t initialize_carry_over(
1773 rrd_value_t pdp_temp_val,
1774 int current_cf,
1775 unsigned long elapsed_pdp_st,
1776 unsigned long start_pdp_offset,
1777 unsigned long pdp_cnt)
1778 {
1779 unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1780 if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1781 switch (current_cf) {
1782 case CF_MAXIMUM:
1783 return -DINF;
1784 case CF_MINIMUM:
1785 return DINF;
1786 case CF_AVERAGE:
1787 return 0;
1788 default:
1789 return DNAN;
1790 }
1791 }
1792 else {
1793 switch (current_cf) {
1794 case CF_AVERAGE:
1795 return pdp_temp_val * pdp_into_cdp_cnt ;
1796 default:
1797 return pdp_temp_val;
1798 }
1799 }
1800 }
1802 /*
1803 * Update or initialize a CDP value based on the consolidation
1804 * function.
1805 *
1806 * Returns the new value.
1807 */
1808 static rrd_value_t calculate_cdp_val(
1809 rrd_value_t cdp_val,
1810 rrd_value_t pdp_temp_val,
1811 unsigned long elapsed_pdp_st,
1812 int current_cf,
1813 #ifdef DEBUG
1814 int i,
1815 int ii
1816 #else
1817 int UNUSED(i),
1818 int UNUSED(ii)
1819 #endif
1820 )
1821 {
1822 if (isnan(cdp_val)) {
1823 if (current_cf == CF_AVERAGE) {
1824 pdp_temp_val *= elapsed_pdp_st;
1825 }
1826 #ifdef DEBUG
1827 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1828 i, ii, pdp_temp_val);
1829 #endif
1830 return pdp_temp_val;
1831 }
1832 if (current_cf == CF_AVERAGE)
1833 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1834 if (current_cf == CF_MINIMUM)
1835 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1836 if (current_cf == CF_MAXIMUM)
1837 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1839 return pdp_temp_val;
1840 }
1842 /*
1843 * For each RRA, update the seasonal values and then call update_aberrant_CF
1844 * for each data source.
1845 *
1846 * Return 0 on success, -1 on error.
1847 */
1848 static int update_aberrant_cdps(
1849 rrd_t *rrd,
1850 rrd_file_t *rrd_file,
1851 unsigned long rra_begin,
1852 unsigned long elapsed_pdp_st,
1853 rrd_value_t *pdp_temp,
1854 rrd_value_t **seasonal_coef)
1855 {
1856 unsigned long rra_idx, ds_idx, j;
1858 /* number of PDP steps since the last update that
1859 * are assigned to the first CDP to be generated
1860 * since the last update. */
1861 unsigned short scratch_idx;
1862 unsigned long rra_start;
1863 enum cf_en current_cf;
1865 /* this loop is only entered if elapsed_pdp_st < 3 */
1866 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1867 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1868 rra_start = rra_begin;
1869 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1870 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1871 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1872 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1873 if (scratch_idx == CDP_primary_val) {
1874 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1875 elapsed_pdp_st + 1, seasonal_coef);
1876 } else {
1877 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1878 elapsed_pdp_st + 2, seasonal_coef);
1879 }
1880 }
1881 if (rrd_test_error())
1882 return -1;
1883 /* loop over data soures within each RRA */
1884 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1885 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1886 rra_idx * (rrd->stat_head->ds_cnt) +
1887 ds_idx, rra_idx, ds_idx, scratch_idx,
1888 *seasonal_coef);
1889 }
1890 }
1891 rra_start += rrd->rra_def[rra_idx].row_cnt
1892 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1893 }
1894 }
1895 return 0;
1896 }
1898 /*
1899 * Move sequentially through the file, writing one RRA at a time. Note this
1900 * architecture divorces the computation of CDP with flushing updated RRA
1901 * entries to disk.
1902 *
1903 * Return 0 on success, -1 on error.
1904 */
1905 static int write_to_rras(
1906 rrd_t *rrd,
1907 rrd_file_t *rrd_file,
1908 unsigned long *rra_step_cnt,
1909 unsigned long rra_begin,
1910 time_t current_time,
1911 unsigned long *skip_update,
1912 rrd_info_t ** pcdp_summary)
1913 {
1914 unsigned long rra_idx;
1915 unsigned long rra_start;
1916 time_t rra_time = 0; /* time of update for a RRA */
1918 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1920 /* Ready to write to disk */
1921 rra_start = rra_begin;
1923 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1924 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1925 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1927 /* for cdp_prep */
1928 unsigned short scratch_idx;
1929 unsigned long step_subtract;
1931 for (scratch_idx = CDP_primary_val,
1932 step_subtract = 1;
1933 rra_step_cnt[rra_idx] > 0;
1934 rra_step_cnt[rra_idx]--,
1935 scratch_idx = CDP_secondary_val,
1936 step_subtract = 2) {
1938 size_t rra_pos_new;
1939 #ifdef DEBUG
1940 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1941 #endif
1942 /* increment, with wrap-around */
1943 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1944 rra_ptr->cur_row = 0;
1946 /* we know what our position should be */
1947 rra_pos_new = rra_start
1948 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1950 /* re-seek if the position is wrong or we wrapped around */
1951 if ((size_t)rra_pos_new != rrd_file->pos) {
1952 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1953 rrd_set_error("seek error in rrd");
1954 return -1;
1955 }
1956 }
1957 #ifdef DEBUG
1958 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1959 #endif
1961 if (skip_update[rra_idx])
1962 continue;
1964 if (*pcdp_summary != NULL) {
1965 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1967 rra_time = (current_time - current_time % step_time)
1968 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1969 }
1971 if (write_RRA_row
1972 (rrd_file, rrd, rra_idx, scratch_idx,
1973 pcdp_summary, rra_time) == -1)
1974 return -1;
1976 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1977 }
1979 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1980 } /* RRA LOOP */
1982 return 0;
1983 }
1985 /*
1986 * Write out one row of values (one value per DS) to the archive.
1987 *
1988 * Returns 0 on success, -1 on error.
1989 */
1990 static int write_RRA_row(
1991 rrd_file_t *rrd_file,
1992 rrd_t *rrd,
1993 unsigned long rra_idx,
1994 unsigned short CDP_scratch_idx,
1995 rrd_info_t ** pcdp_summary,
1996 time_t rra_time)
1997 {
1998 unsigned long ds_idx, cdp_idx;
1999 rrd_infoval_t iv;
2001 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2002 /* compute the cdp index */
2003 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2004 #ifdef DEBUG
2005 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2006 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2007 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2008 #endif
2009 if (*pcdp_summary != NULL) {
2010 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2011 /* append info to the return hash */
2012 *pcdp_summary = rrd_info_push(*pcdp_summary,
2013 sprintf_alloc
2014 ("[%lli]RRA[%s][%lu]DS[%s]",
2015 (long long)rra_time,
2016 rrd->rra_def[rra_idx].cf_nam,
2017 rrd->rra_def[rra_idx].pdp_cnt,
2018 rrd->ds_def[ds_idx].ds_nam),
2019 RD_I_VAL, iv);
2020 }
2021 errno = 0;
2022 if (rrd_write(rrd_file,
2023 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2024 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2025 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2026 return -1;
2027 }
2028 }
2029 return 0;
2030 }
2032 /*
2033 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2034 *
2035 * Returns 0 on success, -1 otherwise
2036 */
2037 static int smooth_all_rras(
2038 rrd_t *rrd,
2039 rrd_file_t *rrd_file,
2040 unsigned long rra_begin)
2041 {
2042 unsigned long rra_start = rra_begin;
2043 unsigned long rra_idx;
2045 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2046 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2047 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2048 #ifdef DEBUG
2049 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2050 #endif
2051 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2052 if (rrd_test_error())
2053 return -1;
2054 }
2055 rra_start += rrd->rra_def[rra_idx].row_cnt
2056 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2057 }
2058 return 0;
2059 }
2061 #ifndef HAVE_MMAP
2062 /*
2063 * Flush changes to disk (unless we're using mmap)
2064 *
2065 * Returns 0 on success, -1 otherwise
2066 */
2067 static int write_changes_to_disk(
2068 rrd_t *rrd,
2069 rrd_file_t *rrd_file,
2070 int version)
2071 {
2072 /* we just need to write back the live header portion now */
2073 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2074 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2075 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2076 SEEK_SET) != 0) {
2077 rrd_set_error("seek rrd for live header writeback");
2078 return -1;
2079 }
2080 if (version >= 3) {
2081 if (rrd_write(rrd_file, rrd->live_head,
2082 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2083 rrd_set_error("rrd_write live_head to rrd");
2084 return -1;
2085 }
2086 } else {
2087 if (rrd_write(rrd_file, rrd->legacy_last_up,
2088 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2089 rrd_set_error("rrd_write live_head to rrd");
2090 return -1;
2091 }
2092 }
2095 if (rrd_write(rrd_file, rrd->pdp_prep,
2096 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2097 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2098 rrd_set_error("rrd_write pdp_prep to rrd");
2099 return -1;
2100 }
2102 if (rrd_write(rrd_file, rrd->cdp_prep,
2103 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2104 rrd->stat_head->ds_cnt)
2105 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2106 rrd->stat_head->ds_cnt)) {
2108 rrd_set_error("rrd_write cdp_prep to rrd");
2109 return -1;
2110 }
2112 if (rrd_write(rrd_file, rrd->rra_ptr,
2113 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2114 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2115 rrd_set_error("rrd_write rra_ptr to rrd");
2116 return -1;
2117 }
2118 return 0;
2119 }
2120 #endif