1 /*****************************************************************************
2 * RRDtool 1.4.5 Copyright by Tobi Oetiker, 1997-2010
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int rrd_update_r(
67 const char *filename,
68 const char *tmplt,
69 int argc,
70 const char **argv);
71 int _rrd_update(
72 const char *filename,
73 const char *tmplt,
74 int argc,
75 const char **argv,
76 rrd_info_t *);
78 static int allocate_data_structures(
79 rrd_t *rrd,
80 char ***updvals,
81 rrd_value_t **pdp_temp,
82 const char *tmplt,
83 long **tmpl_idx,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
90 rrd_t *rrd,
91 const char *tmplt,
92 unsigned long *tmpl_cnt,
93 long *tmpl_idx);
95 static int process_arg(
96 char *step_start,
97 rrd_t *rrd,
98 rrd_file_t *rrd_file,
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
105 char **updvals,
106 long *tmpl_idx,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
109 int version,
110 unsigned long *skip_update,
111 int *schedule_smooth);
113 static int parse_ds(
114 rrd_t *rrd,
115 char **updvals,
116 long *tmpl_idx,
117 char *input,
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
121 int version);
123 static int get_time_from_reading(
124 rrd_t *rrd,
125 char timesyntax,
126 char **updvals,
127 time_t *current_time,
128 unsigned long *current_time_usec,
129 int version);
131 static int update_pdp_prep(
132 rrd_t *rrd,
133 char **updvals,
134 rrd_value_t *pdp_new,
135 double interval);
137 static int calculate_elapsed_steps(
138 rrd_t *rrd,
139 unsigned long current_time,
140 unsigned long current_time_usec,
141 double interval,
142 double *pre_int,
143 double *post_int,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
147 rrd_t *rrd,
148 double interval,
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152 rrd_t *rrd,
153 double interval,
154 double pre_int,
155 double post_int,
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161 rrd_t *rrd,
162 unsigned long ds_idx,
163 double interval,
164 double pre_int,
165 double post_int,
166 long diff_pdp_st,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171 rrd_t *rrd,
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
219 rrd_t *rrd,
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
224 int rra_idx,
225 int ds_idx,
226 int cdp_idx,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230 rrd_value_t pdp_temp_val,
231 int current_cf,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253 rrd_t *rrd,
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
257 time_t current_time,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
263 rrd_t *rrd,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
267 time_t rra_time);
269 static int smooth_all_rras(
270 rrd_t *rrd,
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276 rrd_t *rrd,
277 rrd_file_t *rrd_file,
278 int version);
279 #endif
281 /*
282 * normalize time as returned by gettimeofday. usec part must
283 * be always >= 0
284 */
285 static void normalize_time(
286 struct timeval *t)
287 {
288 if (t->tv_usec < 0) {
289 t->tv_sec--;
290 t->tv_usec += 1e6L;
291 }
292 }
294 /*
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
297 */
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
301 int version)
302 {
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
308 if (version >= 3) {
309 *current_time_usec = tmp_time.tv_usec;
310 } else {
311 *current_time_usec = 0;
312 }
313 }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318 int argc,
319 char **argv)
320 {
321 char *tmplt = NULL;
322 rrd_info_t *result = NULL;
323 rrd_infoval_t rc;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
327 {0, 0, 0, 0}
328 };
330 rc.u_int = -1;
331 optind = 0;
332 opterr = 0; /* initialize getopt */
334 while (1) {
335 int option_index = 0;
336 int opt;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340 if (opt == EOF)
341 break;
343 switch (opt) {
344 case 't':
345 tmplt = optarg;
346 break;
348 case '?':
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
350 goto end_tag;
351 }
352 }
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
360 goto end_tag;
361 }
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
366 goto end_tag;
367 }
368 rc.u_int = 0;
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
371 argc - optind - 1,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
374 end_tag:
375 return result;
376 }
378 int rrd_update(
379 int argc,
380 char **argv)
381 {
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
385 {0, 0, 0, 0}
386 };
387 int option_index = 0;
388 int opt;
389 char *tmplt = NULL;
390 int rc = -1;
391 char *opt_daemon = NULL;
393 optind = 0;
394 opterr = 0; /* initialize getopt */
396 while (1) {
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399 if (opt == EOF)
400 break;
402 switch (opt) {
403 case 't':
404 tmplt = strdup(optarg);
405 break;
407 case 'd':
408 if (opt_daemon != NULL)
409 free (opt_daemon);
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
412 {
413 rrd_set_error("strdup failed.");
414 goto out;
415 }
416 break;
418 case '?':
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 goto out;
421 }
422 }
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
427 goto out;
428 }
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) {
433 rc = status;
434 goto out;
435 }
436 }
438 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
439 {
440 rrd_set_error("The caching daemon cannot be used together with "
441 "templates yet.");
442 goto out;
443 }
445 if (! rrdc_is_connected(opt_daemon))
446 {
447 rc = rrd_update_r(argv[optind], tmplt,
448 argc - optind - 1, (const char **) (argv + optind + 1));
449 }
450 else /* we are connected */
451 {
452 rc = rrdc_update (argv[optind], /* file */
453 argc - optind - 1, /* values_num */
454 (const char *const *) (argv + optind + 1)); /* values */
455 if (rc > 0)
456 rrd_set_error("Failed sending the values to rrdcached: %s",
457 rrd_strerror (rc));
458 }
460 out:
461 if (tmplt != NULL)
462 {
463 free(tmplt);
464 tmplt = NULL;
465 }
466 if (opt_daemon != NULL)
467 {
468 free (opt_daemon);
469 opt_daemon = NULL;
470 }
471 return rc;
472 }
474 int rrd_update_r(
475 const char *filename,
476 const char *tmplt,
477 int argc,
478 const char **argv)
479 {
480 return _rrd_update(filename, tmplt, argc, argv, NULL);
481 }
483 int _rrd_update(
484 const char *filename,
485 const char *tmplt,
486 int argc,
487 const char **argv,
488 rrd_info_t * pcdp_summary)
489 {
491 int arg_i = 2;
493 unsigned long rra_begin; /* byte pointer to the rra
494 * area in the rrd file. this
495 * pointer never changes value */
496 rrd_value_t *pdp_new; /* prepare the incoming data to be added
497 * to the existing entry */
498 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
499 * to the cdp values */
501 long *tmpl_idx; /* index representing the settings
502 * transported by the tmplt index */
503 unsigned long tmpl_cnt = 2; /* time and data */
504 rrd_t rrd;
505 time_t current_time = 0;
506 unsigned long current_time_usec = 0; /* microseconds part of current time */
507 char **updvals;
508 int schedule_smooth = 0;
510 /* number of elapsed PDP steps since last update */
511 unsigned long *rra_step_cnt = NULL;
513 int version; /* rrd version */
514 rrd_file_t *rrd_file;
515 char *arg_copy; /* for processing the argv */
516 unsigned long *skip_update; /* RRAs to advance but not write */
518 /* need at least 1 arguments: data. */
519 if (argc < 1) {
520 rrd_set_error("Not enough arguments");
521 goto err_out;
522 }
524 rrd_init(&rrd);
525 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
526 goto err_free;
527 }
528 /* We are now at the beginning of the rra's */
529 rra_begin = rrd_file->header_len;
531 version = atoi(rrd.stat_head->version);
533 initialize_time(¤t_time, ¤t_time_usec, version);
535 /* get exclusive lock to whole file.
536 * lock gets removed when we close the file.
537 */
538 if (rrd_lock(rrd_file) != 0) {
539 rrd_set_error("could not lock RRD");
540 goto err_close;
541 }
543 if (allocate_data_structures(&rrd, &updvals,
544 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
545 &rra_step_cnt, &skip_update,
546 &pdp_new) == -1) {
547 goto err_close;
548 }
550 /* loop through the arguments. */
551 for (arg_i = 0; arg_i < argc; arg_i++) {
552 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
553 rrd_set_error("failed duplication argv entry");
554 break;
555 }
556 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
557 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
558 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
559 &pcdp_summary, version, skip_update,
560 &schedule_smooth) == -1) {
561 if (rrd_test_error()) { /* Should have error string always here */
562 char *save_error;
564 /* Prepend file name to error message */
565 if ((save_error = strdup(rrd_get_error())) != NULL) {
566 rrd_set_error("%s: %s", filename, save_error);
567 free(save_error);
568 }
569 }
570 free(arg_copy);
571 break;
572 }
573 free(arg_copy);
574 }
576 free(rra_step_cnt);
578 /* if we got here and if there is an error and if the file has not been
579 * written to, then close things up and return. */
580 if (rrd_test_error()) {
581 goto err_free_structures;
582 }
583 #ifndef HAVE_MMAP
584 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
585 goto err_free_structures;
586 }
587 #endif
589 /* calling the smoothing code here guarantees at most one smoothing
590 * operation per rrd_update call. Unfortunately, it is possible with bulk
591 * updates, or a long-delayed update for smoothing to occur off-schedule.
592 * This really isn't critical except during the burn-in cycles. */
593 if (schedule_smooth) {
594 smooth_all_rras(&rrd, rrd_file, rra_begin);
595 }
597 /* rrd_dontneed(rrd_file,&rrd); */
598 rrd_free(&rrd);
599 rrd_close(rrd_file);
601 free(pdp_new);
602 free(tmpl_idx);
603 free(pdp_temp);
604 free(skip_update);
605 free(updvals);
606 return 0;
608 err_free_structures:
609 free(pdp_new);
610 free(tmpl_idx);
611 free(pdp_temp);
612 free(skip_update);
613 free(updvals);
614 err_close:
615 rrd_close(rrd_file);
616 err_free:
617 rrd_free(&rrd);
618 err_out:
619 return -1;
620 }
622 /*
623 * Allocate some important arrays used, and initialize the template.
624 *
625 * When it returns, either all of the structures are allocated
626 * or none of them are.
627 *
628 * Returns 0 on success, -1 on error.
629 */
630 static int allocate_data_structures(
631 rrd_t *rrd,
632 char ***updvals,
633 rrd_value_t **pdp_temp,
634 const char *tmplt,
635 long **tmpl_idx,
636 unsigned long *tmpl_cnt,
637 unsigned long **rra_step_cnt,
638 unsigned long **skip_update,
639 rrd_value_t **pdp_new)
640 {
641 unsigned i, ii;
642 if ((*updvals = (char **) malloc(sizeof(char *)
643 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
644 rrd_set_error("allocating updvals pointer array.");
645 return -1;
646 }
647 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
648 * rrd->stat_head->ds_cnt)) ==
649 NULL) {
650 rrd_set_error("allocating pdp_temp.");
651 goto err_free_updvals;
652 }
653 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
654 *
655 rrd->stat_head->rra_cnt)) ==
656 NULL) {
657 rrd_set_error("allocating skip_update.");
658 goto err_free_pdp_temp;
659 }
660 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
661 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
662 rrd_set_error("allocating tmpl_idx.");
663 goto err_free_skip_update;
664 }
665 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
666 *
667 (rrd->stat_head->
668 rra_cnt))) == NULL) {
669 rrd_set_error("allocating rra_step_cnt.");
670 goto err_free_tmpl_idx;
671 }
673 /* initialize tmplt redirector */
674 /* default config example (assume DS 1 is a CDEF DS)
675 tmpl_idx[0] -> 0; (time)
676 tmpl_idx[1] -> 1; (DS 0)
677 tmpl_idx[2] -> 3; (DS 2)
678 tmpl_idx[3] -> 4; (DS 3) */
679 (*tmpl_idx)[0] = 0; /* time */
680 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
681 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
682 (*tmpl_idx)[ii++] = i;
683 }
684 *tmpl_cnt = ii;
686 if (tmplt != NULL) {
687 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
688 goto err_free_rra_step_cnt;
689 }
690 }
692 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
693 * rrd->stat_head->ds_cnt)) == NULL) {
694 rrd_set_error("allocating pdp_new.");
695 goto err_free_rra_step_cnt;
696 }
698 return 0;
700 err_free_rra_step_cnt:
701 free(*rra_step_cnt);
702 err_free_tmpl_idx:
703 free(*tmpl_idx);
704 err_free_skip_update:
705 free(*skip_update);
706 err_free_pdp_temp:
707 free(*pdp_temp);
708 err_free_updvals:
709 free(*updvals);
710 return -1;
711 }
713 /*
714 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
715 *
716 * Returns 0 on success.
717 */
718 static int parse_template(
719 rrd_t *rrd,
720 const char *tmplt,
721 unsigned long *tmpl_cnt,
722 long *tmpl_idx)
723 {
724 char *dsname, *tmplt_copy;
725 unsigned int tmpl_len, i;
726 int ret = 0;
728 *tmpl_cnt = 1; /* the first entry is the time */
730 /* we should work on a writeable copy here */
731 if ((tmplt_copy = strdup(tmplt)) == NULL) {
732 rrd_set_error("error copying tmplt '%s'", tmplt);
733 ret = -1;
734 goto out;
735 }
737 dsname = tmplt_copy;
738 tmpl_len = strlen(tmplt_copy);
739 for (i = 0; i <= tmpl_len; i++) {
740 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
741 tmplt_copy[i] = '\0';
742 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
743 rrd_set_error("tmplt contains more DS definitions than RRD");
744 ret = -1;
745 goto out_free_tmpl_copy;
746 }
747 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
748 rrd_set_error("unknown DS name '%s'", dsname);
749 ret = -1;
750 goto out_free_tmpl_copy;
751 }
752 /* go to the next entry on the tmplt_copy */
753 if (i < tmpl_len)
754 dsname = &tmplt_copy[i + 1];
755 }
756 }
757 out_free_tmpl_copy:
758 free(tmplt_copy);
759 out:
760 return ret;
761 }
763 /*
764 * Parse an update string, updates the primary data points (PDPs)
765 * and consolidated data points (CDPs), and writes changes to the RRAs.
766 *
767 * Returns 0 on success, -1 on error.
768 */
769 static int process_arg(
770 char *step_start,
771 rrd_t *rrd,
772 rrd_file_t *rrd_file,
773 unsigned long rra_begin,
774 time_t *current_time,
775 unsigned long *current_time_usec,
776 rrd_value_t *pdp_temp,
777 rrd_value_t *pdp_new,
778 unsigned long *rra_step_cnt,
779 char **updvals,
780 long *tmpl_idx,
781 unsigned long tmpl_cnt,
782 rrd_info_t ** pcdp_summary,
783 int version,
784 unsigned long *skip_update,
785 int *schedule_smooth)
786 {
787 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
789 /* a vector of future Holt-Winters seasonal coefs */
790 unsigned long elapsed_pdp_st;
792 double interval, pre_int, post_int; /* interval between this and
793 * the last run */
794 unsigned long proc_pdp_cnt;
796 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
797 current_time, current_time_usec, version) == -1) {
798 return -1;
799 }
801 interval = (double) (*current_time - rrd->live_head->last_up)
802 + (double) ((long) *current_time_usec -
803 (long) rrd->live_head->last_up_usec) / 1e6f;
805 /* process the data sources and update the pdp_prep
806 * area accordingly */
807 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
808 return -1;
809 }
811 elapsed_pdp_st = calculate_elapsed_steps(rrd,
812 *current_time,
813 *current_time_usec, interval,
814 &pre_int, &post_int,
815 &proc_pdp_cnt);
817 /* has a pdp_st moment occurred since the last run ? */
818 if (elapsed_pdp_st == 0) {
819 /* no we have not passed a pdp_st moment. therefore update is simple */
820 simple_update(rrd, interval, pdp_new);
821 } else {
822 /* an pdp_st has occurred. */
823 if (process_all_pdp_st(rrd, interval,
824 pre_int, post_int,
825 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
826 return -1;
827 }
828 if (update_all_cdp_prep(rrd, rra_step_cnt,
829 rra_begin, rrd_file,
830 elapsed_pdp_st,
831 proc_pdp_cnt,
832 &last_seasonal_coef,
833 &seasonal_coef,
834 pdp_temp,
835 skip_update, schedule_smooth) == -1) {
836 goto err_free_coefficients;
837 }
838 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
839 elapsed_pdp_st, pdp_temp,
840 &seasonal_coef) == -1) {
841 goto err_free_coefficients;
842 }
843 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
844 *current_time, skip_update,
845 pcdp_summary) == -1) {
846 goto err_free_coefficients;
847 }
848 } /* endif a pdp_st has occurred */
849 rrd->live_head->last_up = *current_time;
850 rrd->live_head->last_up_usec = *current_time_usec;
852 if (version < 3) {
853 *rrd->legacy_last_up = rrd->live_head->last_up;
854 }
855 free(seasonal_coef);
856 free(last_seasonal_coef);
857 return 0;
859 err_free_coefficients:
860 free(seasonal_coef);
861 free(last_seasonal_coef);
862 return -1;
863 }
865 /*
866 * Parse a DS string (time + colon-separated values), storing the
867 * results in current_time, current_time_usec, and updvals.
868 *
869 * Returns 0 on success, -1 on error.
870 */
871 static int parse_ds(
872 rrd_t *rrd,
873 char **updvals,
874 long *tmpl_idx,
875 char *input,
876 unsigned long tmpl_cnt,
877 time_t *current_time,
878 unsigned long *current_time_usec,
879 int version)
880 {
881 char *p;
882 unsigned long i;
883 char timesyntax;
885 updvals[0] = input;
886 /* initialize all ds input to unknown except the first one
887 which has always got to be set */
888 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
889 updvals[i] = "U";
891 /* separate all ds elements; first must be examined separately
892 due to alternate time syntax */
893 if ((p = strchr(input, '@')) != NULL) {
894 timesyntax = '@';
895 } else if ((p = strchr(input, ':')) != NULL) {
896 timesyntax = ':';
897 } else {
898 rrd_set_error("expected timestamp not found in data source from %s",
899 input);
900 return -1;
901 }
902 *p = '\0';
903 i = 1;
904 updvals[tmpl_idx[i++]] = p + 1;
905 while (*(++p)) {
906 if (*p == ':') {
907 *p = '\0';
908 if (i < tmpl_cnt) {
909 updvals[tmpl_idx[i++]] = p + 1;
910 }
911 else {
912 rrd_set_error("found extra data on update argument: %s",p+1);
913 return -1;
914 }
915 }
916 }
918 if (i != tmpl_cnt) {
919 rrd_set_error("expected %lu data source readings (got %lu) from %s",
920 tmpl_cnt - 1, i - 1, input);
921 return -1;
922 }
924 if (get_time_from_reading(rrd, timesyntax, updvals,
925 current_time, current_time_usec,
926 version) == -1) {
927 return -1;
928 }
929 return 0;
930 }
932 /*
933 * Parse the time in a DS string, store it in current_time and
934 * current_time_usec and verify that it's later than the last
935 * update for this DS.
936 *
937 * Returns 0 on success, -1 on error.
938 */
939 static int get_time_from_reading(
940 rrd_t *rrd,
941 char timesyntax,
942 char **updvals,
943 time_t *current_time,
944 unsigned long *current_time_usec,
945 int version)
946 {
947 double tmp;
948 char *parsetime_error = NULL;
949 char *old_locale;
950 rrd_time_value_t ds_tv;
951 struct timeval tmp_time; /* used for time conversion */
953 /* get the time from the reading ... handle N */
954 if (timesyntax == '@') { /* at-style */
955 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
956 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
957 return -1;
958 }
959 if (ds_tv.type == RELATIVE_TO_END_TIME ||
960 ds_tv.type == RELATIVE_TO_START_TIME) {
961 rrd_set_error("specifying time relative to the 'start' "
962 "or 'end' makes no sense here: %s", updvals[0]);
963 return -1;
964 }
965 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
966 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
967 } else if (strcmp(updvals[0], "N") == 0) {
968 gettimeofday(&tmp_time, 0);
969 normalize_time(&tmp_time);
970 *current_time = tmp_time.tv_sec;
971 *current_time_usec = tmp_time.tv_usec;
972 } else {
973 old_locale = setlocale(LC_NUMERIC, NULL);
974 setlocale(LC_NUMERIC, "C");
975 errno = 0;
976 tmp = strtod(updvals[0], 0);
977 if (errno > 0) {
978 rrd_set_error("converting '%s' to float: %s",
979 updvals[0], rrd_strerror(errno));
980 return -1;
981 };
982 setlocale(LC_NUMERIC, old_locale);
983 if (tmp < 0.0){
984 gettimeofday(&tmp_time, 0);
985 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
986 }
988 *current_time = floor(tmp);
989 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
990 }
991 /* dont do any correction for old version RRDs */
992 if (version < 3)
993 *current_time_usec = 0;
995 if (*current_time < rrd->live_head->last_up ||
996 (*current_time == rrd->live_head->last_up &&
997 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
998 rrd_set_error("illegal attempt to update using time %ld when "
999 "last update time is %ld (minimum one second step)",
1000 *current_time, rrd->live_head->last_up);
1001 return -1;
1002 }
1003 return 0;
1004 }
1006 /*
1007 * Update pdp_new by interpreting the updvals according to the DS type
1008 * (COUNTER, GAUGE, etc.).
1009 *
1010 * Returns 0 on success, -1 on error.
1011 */
1012 static int update_pdp_prep(
1013 rrd_t *rrd,
1014 char **updvals,
1015 rrd_value_t *pdp_new,
1016 double interval)
1017 {
1018 unsigned long ds_idx;
1019 int ii;
1020 char *endptr; /* used in the conversion */
1021 double rate;
1022 char *old_locale;
1023 enum dst_en dst_idx;
1025 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1026 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1028 /* make sure we do not build diffs with old last_ds values */
1029 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1030 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1031 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1032 }
1034 /* NOTE: DST_CDEF should never enter this if block, because
1035 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1036 * accidently specified a value for the DST_CDEF. To handle this case,
1037 * an extra check is required. */
1039 if ((updvals[ds_idx + 1][0] != 'U') &&
1040 (dst_idx != DST_CDEF) &&
1041 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1042 rate = DNAN;
1044 /* pdp_new contains rate * time ... eg the bytes transferred during
1045 * the interval. Doing it this way saves a lot of math operations
1046 */
1047 switch (dst_idx) {
1048 case DST_COUNTER:
1049 case DST_DERIVE:
1050 /* Check if this is a valid integer. `U' is already handled in
1051 * another branch. */
1052 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1053 if ((ii == 0) && (dst_idx == DST_DERIVE)
1054 && (updvals[ds_idx + 1][ii] == '-'))
1055 continue;
1057 if ((updvals[ds_idx + 1][ii] < '0')
1058 || (updvals[ds_idx + 1][ii] > '9')) {
1059 rrd_set_error("not a simple %s integer: '%s'",
1060 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1061 updvals[ds_idx + 1]);
1062 return -1;
1063 }
1064 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1066 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1067 pdp_new[ds_idx] =
1068 rrd_diff(updvals[ds_idx + 1],
1069 rrd->pdp_prep[ds_idx].last_ds);
1070 if (dst_idx == DST_COUNTER) {
1071 /* simple overflow catcher. This will fail
1072 * terribly for non 32 or 64 bit counters
1073 * ... are there any others in SNMP land?
1074 */
1075 if (pdp_new[ds_idx] < (double) 0.0)
1076 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1077 if (pdp_new[ds_idx] < (double) 0.0)
1078 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1079 }
1080 rate = pdp_new[ds_idx] / interval;
1081 } else {
1082 pdp_new[ds_idx] = DNAN;
1083 }
1084 break;
1085 case DST_ABSOLUTE:
1086 old_locale = setlocale(LC_NUMERIC, NULL);
1087 setlocale(LC_NUMERIC, "C");
1088 errno = 0;
1089 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1090 if (errno > 0) {
1091 rrd_set_error("converting '%s' to float: %s",
1092 updvals[ds_idx + 1], rrd_strerror(errno));
1093 return -1;
1094 };
1095 setlocale(LC_NUMERIC, old_locale);
1096 if (endptr[0] != '\0') {
1097 rrd_set_error
1098 ("conversion of '%s' to float not complete: tail '%s'",
1099 updvals[ds_idx + 1], endptr);
1100 return -1;
1101 }
1102 rate = pdp_new[ds_idx] / interval;
1103 break;
1104 case DST_GAUGE:
1105 old_locale = setlocale(LC_NUMERIC, NULL);
1106 setlocale(LC_NUMERIC, "C");
1107 errno = 0;
1108 pdp_new[ds_idx] =
1109 strtod(updvals[ds_idx + 1], &endptr) * interval;
1110 if (errno) {
1111 rrd_set_error("converting '%s' to float: %s",
1112 updvals[ds_idx + 1], rrd_strerror(errno));
1113 return -1;
1114 };
1115 setlocale(LC_NUMERIC, old_locale);
1116 if (endptr[0] != '\0') {
1117 rrd_set_error
1118 ("conversion of '%s' to float not complete: tail '%s'",
1119 updvals[ds_idx + 1], endptr);
1120 return -1;
1121 }
1122 rate = pdp_new[ds_idx] / interval;
1123 break;
1124 default:
1125 rrd_set_error("rrd contains unknown DS type : '%s'",
1126 rrd->ds_def[ds_idx].dst);
1127 return -1;
1128 }
1129 /* break out of this for loop if the error string is set */
1130 if (rrd_test_error()) {
1131 return -1;
1132 }
1133 /* make sure pdp_temp is neither too large or too small
1134 * if any of these occur it becomes unknown ...
1135 * sorry folks ... */
1136 if (!isnan(rate) &&
1137 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1138 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1139 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1140 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1141 pdp_new[ds_idx] = DNAN;
1142 }
1143 } else {
1144 /* no news is news all the same */
1145 pdp_new[ds_idx] = DNAN;
1146 }
1149 /* make a copy of the command line argument for the next run */
1150 #ifdef DEBUG
1151 fprintf(stderr, "prep ds[%lu]\t"
1152 "last_arg '%s'\t"
1153 "this_arg '%s'\t"
1154 "pdp_new %10.2f\n",
1155 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1156 pdp_new[ds_idx]);
1157 #endif
1158 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1159 LAST_DS_LEN - 1);
1160 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1161 }
1162 return 0;
1163 }
1165 /*
1166 * How many PDP steps have elapsed since the last update? Returns the answer,
1167 * and stores the time between the last update and the last PDP in pre_time,
1168 * and the time between the last PDP and the current time in post_int.
1169 */
1170 static int calculate_elapsed_steps(
1171 rrd_t *rrd,
1172 unsigned long current_time,
1173 unsigned long current_time_usec,
1174 double interval,
1175 double *pre_int,
1176 double *post_int,
1177 unsigned long *proc_pdp_cnt)
1178 {
1179 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1180 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1181 * time */
1182 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1183 * when it was last updated */
1184 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1186 /* when was the current pdp started */
1187 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1188 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1190 /* when did the last pdp_st occur */
1191 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1192 occu_pdp_st = current_time - occu_pdp_age;
1194 if (occu_pdp_st > proc_pdp_st) {
1195 /* OK we passed the pdp_st moment */
1196 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1197 * occurred before the latest
1198 * pdp_st moment*/
1199 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1200 *post_int = occu_pdp_age; /* how much after it */
1201 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1202 } else {
1203 *pre_int = interval;
1204 *post_int = 0;
1205 }
1207 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1209 #ifdef DEBUG
1210 printf("proc_pdp_age %lu\t"
1211 "proc_pdp_st %lu\t"
1212 "occu_pfp_age %lu\t"
1213 "occu_pdp_st %lu\t"
1214 "int %lf\t"
1215 "pre_int %lf\t"
1216 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1217 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1218 #endif
1220 /* compute the number of elapsed pdp_st moments */
1221 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1222 }
1224 /*
1225 * Increment the PDP values by the values in pdp_new, or else initialize them.
1226 */
1227 static void simple_update(
1228 rrd_t *rrd,
1229 double interval,
1230 rrd_value_t *pdp_new)
1231 {
1232 int i;
1234 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1235 if (isnan(pdp_new[i])) {
1236 /* this is not really accurate if we use subsecond data arrival time
1237 should have thought of it when going subsecond resolution ...
1238 sorry next format change we will have it! */
1239 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1240 floor(interval);
1241 } else {
1242 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1243 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1244 } else {
1245 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1246 }
1247 }
1248 #ifdef DEBUG
1249 fprintf(stderr,
1250 "NO PDP ds[%i]\t"
1251 "value %10.2f\t"
1252 "unkn_sec %5lu\n",
1253 i,
1254 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1255 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1256 #endif
1257 }
1258 }
1260 /*
1261 * Call process_pdp_st for each DS.
1262 *
1263 * Returns 0 on success, -1 on error.
1264 */
1265 static int process_all_pdp_st(
1266 rrd_t *rrd,
1267 double interval,
1268 double pre_int,
1269 double post_int,
1270 unsigned long elapsed_pdp_st,
1271 rrd_value_t *pdp_new,
1272 rrd_value_t *pdp_temp)
1273 {
1274 unsigned long ds_idx;
1276 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1277 rate*seconds which occurred up to the last run.
1278 pdp_new[] contains rate*seconds from the latest run.
1279 pdp_temp[] will contain the rate for cdp */
1281 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1282 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1283 elapsed_pdp_st * rrd->stat_head->pdp_step,
1284 pdp_new, pdp_temp) == -1) {
1285 return -1;
1286 }
1287 #ifdef DEBUG
1288 fprintf(stderr, "PDP UPD ds[%lu]\t"
1289 "elapsed_pdp_st %lu\t"
1290 "pdp_temp %10.2f\t"
1291 "new_prep %10.2f\t"
1292 "new_unkn_sec %5lu\n",
1293 ds_idx,
1294 elapsed_pdp_st,
1295 pdp_temp[ds_idx],
1296 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1297 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1298 #endif
1299 }
1300 return 0;
1301 }
1303 /*
1304 * Process an update that occurs after one of the PDP moments.
1305 * Increments the PDP value, sets NAN if time greater than the
1306 * heartbeats have elapsed, processes CDEFs.
1307 *
1308 * Returns 0 on success, -1 on error.
1309 */
1310 static int process_pdp_st(
1311 rrd_t *rrd,
1312 unsigned long ds_idx,
1313 double interval,
1314 double pre_int,
1315 double post_int,
1316 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1317 rrd_value_t *pdp_new,
1318 rrd_value_t *pdp_temp)
1319 {
1320 int i;
1322 /* update pdp_prep to the current pdp_st. */
1323 double pre_unknown = 0.0;
1324 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1325 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1327 rpnstack_t rpnstack; /* used for COMPUTE DS */
1329 rpnstack_init(&rpnstack);
1332 if (isnan(pdp_new[ds_idx])) {
1333 /* a final bit of unknown to be added before calculation
1334 we use a temporary variable for this so that we
1335 don't have to turn integer lines before using the value */
1336 pre_unknown = pre_int;
1337 } else {
1338 if (isnan(scratch[PDP_val].u_val)) {
1339 scratch[PDP_val].u_val = 0;
1340 }
1341 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1342 }
1344 /* if too much of the pdp_prep is unknown we dump it */
1345 /* if the interval is larger thatn mrhb we get NAN */
1346 if ((interval > mrhb) ||
1347 (rrd->stat_head->pdp_step / 2.0 <
1348 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1349 pdp_temp[ds_idx] = DNAN;
1350 } else {
1351 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1352 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1353 pre_unknown);
1354 }
1356 /* process CDEF data sources; remember each CDEF DS can
1357 * only reference other DS with a lower index number */
1358 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1359 rpnp_t *rpnp;
1361 rpnp =
1362 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1363 if(rpnp == NULL) {
1364 rpnstack_free(&rpnstack);
1365 return -1;
1366 }
1367 /* substitute data values for OP_VARIABLE nodes */
1368 for (i = 0; rpnp[i].op != OP_END; i++) {
1369 if (rpnp[i].op == OP_VARIABLE) {
1370 rpnp[i].op = OP_NUMBER;
1371 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1372 }
1373 }
1374 /* run the rpn calculator */
1375 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1376 free(rpnp);
1377 rpnstack_free(&rpnstack);
1378 return -1;
1379 }
1380 free(rpnp);
1381 }
1383 /* make pdp_prep ready for the next run */
1384 if (isnan(pdp_new[ds_idx])) {
1385 /* this is not realy accurate if we use subsecond data arival time
1386 should have thought of it when going subsecond resolution ...
1387 sorry next format change we will have it! */
1388 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1389 scratch[PDP_val].u_val = DNAN;
1390 } else {
1391 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1392 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1393 }
1394 rpnstack_free(&rpnstack);
1395 return 0;
1396 }
1398 /*
1399 * Iterate over all the RRAs for a given DS and:
1400 * 1. Decide whether to schedule a smooth later
1401 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1402 * 3. Update the CDP
1403 *
1404 * Returns 0 on success, -1 on error
1405 */
1406 static int update_all_cdp_prep(
1407 rrd_t *rrd,
1408 unsigned long *rra_step_cnt,
1409 unsigned long rra_begin,
1410 rrd_file_t *rrd_file,
1411 unsigned long elapsed_pdp_st,
1412 unsigned long proc_pdp_cnt,
1413 rrd_value_t **last_seasonal_coef,
1414 rrd_value_t **seasonal_coef,
1415 rrd_value_t *pdp_temp,
1416 unsigned long *skip_update,
1417 int *schedule_smooth)
1418 {
1419 unsigned long rra_idx;
1421 /* index into the CDP scratch array */
1422 enum cf_en current_cf;
1423 unsigned long rra_start;
1425 /* number of rows to be updated in an RRA for a data value. */
1426 unsigned long start_pdp_offset;
1428 rra_start = rra_begin;
1429 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1430 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1431 start_pdp_offset =
1432 rrd->rra_def[rra_idx].pdp_cnt -
1433 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1434 skip_update[rra_idx] = 0;
1435 if (start_pdp_offset <= elapsed_pdp_st) {
1436 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1437 rrd->rra_def[rra_idx].pdp_cnt + 1;
1438 } else {
1439 rra_step_cnt[rra_idx] = 0;
1440 }
1442 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1443 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1444 * so that they will be correct for the next observed value; note that for
1445 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1446 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1447 if (rra_step_cnt[rra_idx] > 1) {
1448 skip_update[rra_idx] = 1;
1449 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1450 elapsed_pdp_st, last_seasonal_coef);
1451 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1452 elapsed_pdp_st + 1, seasonal_coef);
1453 }
1454 /* periodically run a smoother for seasonal effects */
1455 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1456 #ifdef DEBUG
1457 fprintf(stderr,
1458 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1459 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1460 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1461 u_cnt);
1462 #endif
1463 *schedule_smooth = 1;
1464 }
1465 }
1466 if (rrd_test_error())
1467 return -1;
1469 if (update_cdp_prep
1470 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1471 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1472 current_cf) == -1) {
1473 return -1;
1474 }
1475 rra_start +=
1476 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1477 sizeof(rrd_value_t);
1478 }
1479 return 0;
1480 }
1482 /*
1483 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1484 */
1485 static int do_schedule_smooth(
1486 rrd_t *rrd,
1487 unsigned long rra_idx,
1488 unsigned long elapsed_pdp_st)
1489 {
1490 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1491 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1492 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1493 unsigned long seasonal_smooth_idx =
1494 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1495 unsigned long *init_seasonal =
1496 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1498 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1499 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1500 * really an RRA level, not a data source within RRA level parameter, but
1501 * the rra_def is read only for rrd_update (not flushed to disk). */
1502 if (*init_seasonal > BURNIN_CYCLES) {
1503 /* someone has no doubt invented a trick to deal with this wrap around,
1504 * but at least this code is clear. */
1505 if (seasonal_smooth_idx > cur_row) {
1506 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1507 * between PDP and CDP */
1508 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1509 }
1510 /* can't rely on negative numbers because we are working with
1511 * unsigned values */
1512 return (cur_row + elapsed_pdp_st >= row_cnt
1513 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1514 }
1515 /* mark off one of the burn-in cycles */
1516 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1517 }
1519 /*
1520 * For a given RRA, iterate over the data sources and call the appropriate
1521 * consolidation function.
1522 *
1523 * Returns 0 on success, -1 on error.
1524 */
1525 static int update_cdp_prep(
1526 rrd_t *rrd,
1527 unsigned long elapsed_pdp_st,
1528 unsigned long start_pdp_offset,
1529 unsigned long *rra_step_cnt,
1530 int rra_idx,
1531 rrd_value_t *pdp_temp,
1532 rrd_value_t *last_seasonal_coef,
1533 rrd_value_t *seasonal_coef,
1534 int current_cf)
1535 {
1536 unsigned long ds_idx, cdp_idx;
1538 /* update CDP_PREP areas */
1539 /* loop over data soures within each RRA */
1540 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1542 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1544 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1545 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1546 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1547 elapsed_pdp_st, start_pdp_offset,
1548 rrd->rra_def[rra_idx].pdp_cnt,
1549 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1550 rra_idx, ds_idx);
1551 } else {
1552 /* Nothing to consolidate if there's one PDP per CDP. However, if
1553 * we've missed some PDPs, let's update null counters etc. */
1554 if (elapsed_pdp_st > 2) {
1555 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1556 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1557 (enum cf_en)current_cf);
1558 }
1559 }
1561 if (rrd_test_error())
1562 return -1;
1563 } /* endif data sources loop */
1564 return 0;
1565 }
1567 /*
1568 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1569 * primary value, secondary value, and # of unknowns.
1570 */
1571 static void update_cdp(
1572 unival *scratch,
1573 int current_cf,
1574 rrd_value_t pdp_temp_val,
1575 unsigned long rra_step_cnt,
1576 unsigned long elapsed_pdp_st,
1577 unsigned long start_pdp_offset,
1578 unsigned long pdp_cnt,
1579 rrd_value_t xff,
1580 int i,
1581 int ii)
1582 {
1583 /* shorthand variables */
1584 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1585 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1586 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1587 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1589 if (rra_step_cnt) {
1590 /* If we are in this block, as least 1 CDP value will be written to
1591 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1592 * to be written, then the "fill in" value is the CDP_secondary_val
1593 * entry. */
1594 if (isnan(pdp_temp_val)) {
1595 *cdp_unkn_pdp_cnt += start_pdp_offset;
1596 *cdp_secondary_val = DNAN;
1597 } else {
1598 /* CDP_secondary value is the RRA "fill in" value for intermediary
1599 * CDP data entries. No matter the CF, the value is the same because
1600 * the average, max, min, and last of a list of identical values is
1601 * the same, namely, the value itself. */
1602 *cdp_secondary_val = pdp_temp_val;
1603 }
1605 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1606 *cdp_primary_val = DNAN;
1607 } else {
1608 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1609 start_pdp_offset, pdp_cnt);
1610 }
1611 *cdp_val =
1612 initialize_carry_over(pdp_temp_val,current_cf,
1613 elapsed_pdp_st,
1614 start_pdp_offset, pdp_cnt);
1615 /* endif meets xff value requirement for a valid value */
1616 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1617 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1618 if (isnan(pdp_temp_val))
1619 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1620 else
1621 *cdp_unkn_pdp_cnt = 0;
1622 } else { /* rra_step_cnt[i] == 0 */
1624 #ifdef DEBUG
1625 if (isnan(*cdp_val)) {
1626 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1627 i, ii);
1628 } else {
1629 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1630 i, ii, *cdp_val);
1631 }
1632 #endif
1633 if (isnan(pdp_temp_val)) {
1634 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1635 } else {
1636 *cdp_val =
1637 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1638 current_cf, i, ii);
1639 }
1640 }
1641 }
1643 /*
1644 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1645 * on the type of consolidation function.
1646 */
1647 static void initialize_cdp_val(
1648 unival *scratch,
1649 int current_cf,
1650 rrd_value_t pdp_temp_val,
1651 unsigned long start_pdp_offset,
1652 unsigned long pdp_cnt)
1653 {
1654 rrd_value_t cum_val, cur_val;
1656 switch (current_cf) {
1657 case CF_AVERAGE:
1658 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1659 cur_val = IFDNAN(pdp_temp_val, 0.0);
1660 scratch[CDP_primary_val].u_val =
1661 (cum_val + cur_val * start_pdp_offset) /
1662 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1663 break;
1664 case CF_MAXIMUM:
1665 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1666 cur_val = IFDNAN(pdp_temp_val, -DINF);
1668 #if 0
1669 #ifdef DEBUG
1670 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1671 fprintf(stderr,
1672 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1673 i, ii);
1674 exit(-1);
1675 }
1676 #endif
1677 #endif
1678 if (cur_val > cum_val)
1679 scratch[CDP_primary_val].u_val = cur_val;
1680 else
1681 scratch[CDP_primary_val].u_val = cum_val;
1682 break;
1683 case CF_MINIMUM:
1684 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1685 cur_val = IFDNAN(pdp_temp_val, DINF);
1686 #if 0
1687 #ifdef DEBUG
1688 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1689 fprintf(stderr,
1690 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1691 ii);
1692 exit(-1);
1693 }
1694 #endif
1695 #endif
1696 if (cur_val < cum_val)
1697 scratch[CDP_primary_val].u_val = cur_val;
1698 else
1699 scratch[CDP_primary_val].u_val = cum_val;
1700 break;
1701 case CF_LAST:
1702 default:
1703 scratch[CDP_primary_val].u_val = pdp_temp_val;
1704 break;
1705 }
1706 }
1708 /*
1709 * Update the consolidation function for Holt-Winters functions as
1710 * well as other functions that don't actually consolidate multiple
1711 * PDPs.
1712 */
1713 static void reset_cdp(
1714 rrd_t *rrd,
1715 unsigned long elapsed_pdp_st,
1716 rrd_value_t *pdp_temp,
1717 rrd_value_t *last_seasonal_coef,
1718 rrd_value_t *seasonal_coef,
1719 int rra_idx,
1720 int ds_idx,
1721 int cdp_idx,
1722 enum cf_en current_cf)
1723 {
1724 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1726 switch (current_cf) {
1727 case CF_AVERAGE:
1728 default:
1729 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1730 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1731 break;
1732 case CF_SEASONAL:
1733 case CF_DEVSEASONAL:
1734 /* need to update cached seasonal values, so they are consistent
1735 * with the bulk update */
1736 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1737 * CDP_last_deviation are the same. */
1738 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1739 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1740 break;
1741 case CF_HWPREDICT:
1742 case CF_MHWPREDICT:
1743 /* need to update the null_count and last_null_count.
1744 * even do this for non-DNAN pdp_temp because the
1745 * algorithm is not learning from batch updates. */
1746 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1747 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1748 /* fall through */
1749 case CF_DEVPREDICT:
1750 scratch[CDP_primary_val].u_val = DNAN;
1751 scratch[CDP_secondary_val].u_val = DNAN;
1752 break;
1753 case CF_FAILURES:
1754 /* do not count missed bulk values as failures */
1755 scratch[CDP_primary_val].u_val = 0;
1756 scratch[CDP_secondary_val].u_val = 0;
1757 /* need to reset violations buffer.
1758 * could do this more carefully, but for now, just
1759 * assume a bulk update wipes away all violations. */
1760 erase_violations(rrd, cdp_idx, rra_idx);
1761 break;
1762 }
1763 }
1765 static rrd_value_t initialize_carry_over(
1766 rrd_value_t pdp_temp_val,
1767 int current_cf,
1768 unsigned long elapsed_pdp_st,
1769 unsigned long start_pdp_offset,
1770 unsigned long pdp_cnt)
1771 {
1772 unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1773 if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1774 switch (current_cf) {
1775 case CF_MAXIMUM:
1776 return -DINF;
1777 case CF_MINIMUM:
1778 return DINF;
1779 case CF_AVERAGE:
1780 return 0;
1781 default:
1782 return DNAN;
1783 }
1784 }
1785 else {
1786 switch (current_cf) {
1787 case CF_AVERAGE:
1788 return pdp_temp_val * pdp_into_cdp_cnt ;
1789 default:
1790 return pdp_temp_val;
1791 }
1792 }
1793 }
1795 /*
1796 * Update or initialize a CDP value based on the consolidation
1797 * function.
1798 *
1799 * Returns the new value.
1800 */
1801 static rrd_value_t calculate_cdp_val(
1802 rrd_value_t cdp_val,
1803 rrd_value_t pdp_temp_val,
1804 unsigned long elapsed_pdp_st,
1805 int current_cf,
1806 #ifdef DEBUG
1807 int i,
1808 int ii
1809 #else
1810 int UNUSED(i),
1811 int UNUSED(ii)
1812 #endif
1813 )
1814 {
1815 if (isnan(cdp_val)) {
1816 if (current_cf == CF_AVERAGE) {
1817 pdp_temp_val *= elapsed_pdp_st;
1818 }
1819 #ifdef DEBUG
1820 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1821 i, ii, pdp_temp_val);
1822 #endif
1823 return pdp_temp_val;
1824 }
1825 if (current_cf == CF_AVERAGE)
1826 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1827 if (current_cf == CF_MINIMUM)
1828 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1829 if (current_cf == CF_MAXIMUM)
1830 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1832 return pdp_temp_val;
1833 }
1835 /*
1836 * For each RRA, update the seasonal values and then call update_aberrant_CF
1837 * for each data source.
1838 *
1839 * Return 0 on success, -1 on error.
1840 */
1841 static int update_aberrant_cdps(
1842 rrd_t *rrd,
1843 rrd_file_t *rrd_file,
1844 unsigned long rra_begin,
1845 unsigned long elapsed_pdp_st,
1846 rrd_value_t *pdp_temp,
1847 rrd_value_t **seasonal_coef)
1848 {
1849 unsigned long rra_idx, ds_idx, j;
1851 /* number of PDP steps since the last update that
1852 * are assigned to the first CDP to be generated
1853 * since the last update. */
1854 unsigned short scratch_idx;
1855 unsigned long rra_start;
1856 enum cf_en current_cf;
1858 /* this loop is only entered if elapsed_pdp_st < 3 */
1859 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1860 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1861 rra_start = rra_begin;
1862 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1863 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1864 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1865 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1866 if (scratch_idx == CDP_primary_val) {
1867 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1868 elapsed_pdp_st + 1, seasonal_coef);
1869 } else {
1870 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1871 elapsed_pdp_st + 2, seasonal_coef);
1872 }
1873 }
1874 if (rrd_test_error())
1875 return -1;
1876 /* loop over data soures within each RRA */
1877 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1878 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1879 rra_idx * (rrd->stat_head->ds_cnt) +
1880 ds_idx, rra_idx, ds_idx, scratch_idx,
1881 *seasonal_coef);
1882 }
1883 }
1884 rra_start += rrd->rra_def[rra_idx].row_cnt
1885 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1886 }
1887 }
1888 return 0;
1889 }
1891 /*
1892 * Move sequentially through the file, writing one RRA at a time. Note this
1893 * architecture divorces the computation of CDP with flushing updated RRA
1894 * entries to disk.
1895 *
1896 * Return 0 on success, -1 on error.
1897 */
1898 static int write_to_rras(
1899 rrd_t *rrd,
1900 rrd_file_t *rrd_file,
1901 unsigned long *rra_step_cnt,
1902 unsigned long rra_begin,
1903 time_t current_time,
1904 unsigned long *skip_update,
1905 rrd_info_t ** pcdp_summary)
1906 {
1907 unsigned long rra_idx;
1908 unsigned long rra_start;
1909 time_t rra_time = 0; /* time of update for a RRA */
1911 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1913 /* Ready to write to disk */
1914 rra_start = rra_begin;
1916 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1917 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1918 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1920 /* for cdp_prep */
1921 unsigned short scratch_idx;
1922 unsigned long step_subtract;
1924 for (scratch_idx = CDP_primary_val,
1925 step_subtract = 1;
1926 rra_step_cnt[rra_idx] > 0;
1927 rra_step_cnt[rra_idx]--,
1928 scratch_idx = CDP_secondary_val,
1929 step_subtract = 2) {
1931 size_t rra_pos_new;
1932 #ifdef DEBUG
1933 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1934 #endif
1935 /* increment, with wrap-around */
1936 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1937 rra_ptr->cur_row = 0;
1939 /* we know what our position should be */
1940 rra_pos_new = rra_start
1941 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1943 /* re-seek if the position is wrong or we wrapped around */
1944 if ((size_t)rra_pos_new != rrd_file->pos) {
1945 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1946 rrd_set_error("seek error in rrd");
1947 return -1;
1948 }
1949 }
1950 #ifdef DEBUG
1951 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1952 #endif
1954 if (skip_update[rra_idx])
1955 continue;
1957 if (*pcdp_summary != NULL) {
1958 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1960 rra_time = (current_time - current_time % step_time)
1961 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1962 }
1964 if (write_RRA_row
1965 (rrd_file, rrd, rra_idx, scratch_idx,
1966 pcdp_summary, rra_time) == -1)
1967 return -1;
1969 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1970 }
1972 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1973 } /* RRA LOOP */
1975 return 0;
1976 }
1978 /*
1979 * Write out one row of values (one value per DS) to the archive.
1980 *
1981 * Returns 0 on success, -1 on error.
1982 */
1983 static int write_RRA_row(
1984 rrd_file_t *rrd_file,
1985 rrd_t *rrd,
1986 unsigned long rra_idx,
1987 unsigned short CDP_scratch_idx,
1988 rrd_info_t ** pcdp_summary,
1989 time_t rra_time)
1990 {
1991 unsigned long ds_idx, cdp_idx;
1992 rrd_infoval_t iv;
1994 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1995 /* compute the cdp index */
1996 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1997 #ifdef DEBUG
1998 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1999 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2000 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2001 #endif
2002 if (*pcdp_summary != NULL) {
2003 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2004 /* append info to the return hash */
2005 *pcdp_summary = rrd_info_push(*pcdp_summary,
2006 sprintf_alloc
2007 ("[%lli]RRA[%s][%lu]DS[%s]",
2008 (long long)rra_time,
2009 rrd->rra_def[rra_idx].cf_nam,
2010 rrd->rra_def[rra_idx].pdp_cnt,
2011 rrd->ds_def[ds_idx].ds_nam),
2012 RD_I_VAL, iv);
2013 }
2014 errno = 0;
2015 if (rrd_write(rrd_file,
2016 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2017 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2018 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2019 return -1;
2020 }
2021 }
2022 return 0;
2023 }
2025 /*
2026 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2027 *
2028 * Returns 0 on success, -1 otherwise
2029 */
2030 static int smooth_all_rras(
2031 rrd_t *rrd,
2032 rrd_file_t *rrd_file,
2033 unsigned long rra_begin)
2034 {
2035 unsigned long rra_start = rra_begin;
2036 unsigned long rra_idx;
2038 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2039 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2040 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2041 #ifdef DEBUG
2042 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2043 #endif
2044 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2045 if (rrd_test_error())
2046 return -1;
2047 }
2048 rra_start += rrd->rra_def[rra_idx].row_cnt
2049 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2050 }
2051 return 0;
2052 }
2054 #ifndef HAVE_MMAP
2055 /*
2056 * Flush changes to disk (unless we're using mmap)
2057 *
2058 * Returns 0 on success, -1 otherwise
2059 */
2060 static int write_changes_to_disk(
2061 rrd_t *rrd,
2062 rrd_file_t *rrd_file,
2063 int version)
2064 {
2065 /* we just need to write back the live header portion now */
2066 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2067 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2068 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2069 SEEK_SET) != 0) {
2070 rrd_set_error("seek rrd for live header writeback");
2071 return -1;
2072 }
2073 if (version >= 3) {
2074 if (rrd_write(rrd_file, rrd->live_head,
2075 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2076 rrd_set_error("rrd_write live_head to rrd");
2077 return -1;
2078 }
2079 } else {
2080 if (rrd_write(rrd_file, rrd->legacy_last_up,
2081 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2082 rrd_set_error("rrd_write live_head to rrd");
2083 return -1;
2084 }
2085 }
2088 if (rrd_write(rrd_file, rrd->pdp_prep,
2089 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2090 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2091 rrd_set_error("rrd_write pdp_prep to rrd");
2092 return -1;
2093 }
2095 if (rrd_write(rrd_file, rrd->cdp_prep,
2096 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2097 rrd->stat_head->ds_cnt)
2098 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2099 rrd->stat_head->ds_cnt)) {
2101 rrd_set_error("rrd_write cdp_prep to rrd");
2102 return -1;
2103 }
2105 if (rrd_write(rrd_file, rrd->rra_ptr,
2106 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2107 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2108 rrd_set_error("rrd_write rra_ptr to rrd");
2109 return -1;
2110 }
2111 return 0;
2112 }
2113 #endif