1 /*****************************************************************************
2 * RRDtool 1.4.4 Copyright by Tobi Oetiker, 1997-2010
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int rrd_update_r(
67 const char *filename,
68 const char *tmplt,
69 int argc,
70 const char **argv);
71 int _rrd_update(
72 const char *filename,
73 const char *tmplt,
74 int argc,
75 const char **argv,
76 rrd_info_t *);
78 static int allocate_data_structures(
79 rrd_t *rrd,
80 char ***updvals,
81 rrd_value_t **pdp_temp,
82 const char *tmplt,
83 long **tmpl_idx,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
90 rrd_t *rrd,
91 const char *tmplt,
92 unsigned long *tmpl_cnt,
93 long *tmpl_idx);
95 static int process_arg(
96 char *step_start,
97 rrd_t *rrd,
98 rrd_file_t *rrd_file,
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
105 char **updvals,
106 long *tmpl_idx,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
109 int version,
110 unsigned long *skip_update,
111 int *schedule_smooth);
113 static int parse_ds(
114 rrd_t *rrd,
115 char **updvals,
116 long *tmpl_idx,
117 char *input,
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
121 int version);
123 static int get_time_from_reading(
124 rrd_t *rrd,
125 char timesyntax,
126 char **updvals,
127 time_t *current_time,
128 unsigned long *current_time_usec,
129 int version);
131 static int update_pdp_prep(
132 rrd_t *rrd,
133 char **updvals,
134 rrd_value_t *pdp_new,
135 double interval);
137 static int calculate_elapsed_steps(
138 rrd_t *rrd,
139 unsigned long current_time,
140 unsigned long current_time_usec,
141 double interval,
142 double *pre_int,
143 double *post_int,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
147 rrd_t *rrd,
148 double interval,
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152 rrd_t *rrd,
153 double interval,
154 double pre_int,
155 double post_int,
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161 rrd_t *rrd,
162 unsigned long ds_idx,
163 double interval,
164 double pre_int,
165 double post_int,
166 long diff_pdp_st,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171 rrd_t *rrd,
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long start_pdp_offset,
216 unsigned long pdp_cnt);
218 static void reset_cdp(
219 rrd_t *rrd,
220 unsigned long elapsed_pdp_st,
221 rrd_value_t *pdp_temp,
222 rrd_value_t *last_seasonal_coef,
223 rrd_value_t *seasonal_coef,
224 int rra_idx,
225 int ds_idx,
226 int cdp_idx,
227 enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230 rrd_value_t pdp_temp_val,
231 int current_cf,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253 rrd_t *rrd,
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
257 time_t current_time,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
263 rrd_t *rrd,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
267 time_t rra_time);
269 static int smooth_all_rras(
270 rrd_t *rrd,
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276 rrd_t *rrd,
277 rrd_file_t *rrd_file,
278 int version);
279 #endif
281 /*
282 * normalize time as returned by gettimeofday. usec part must
283 * be always >= 0
284 */
285 static void normalize_time(
286 struct timeval *t)
287 {
288 if (t->tv_usec < 0) {
289 t->tv_sec--;
290 t->tv_usec += 1e6L;
291 }
292 }
294 /*
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
297 */
298 static void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
301 int version)
302 {
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
308 if (version >= 3) {
309 *current_time_usec = tmp_time.tv_usec;
310 } else {
311 *current_time_usec = 0;
312 }
313 }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318 int argc,
319 char **argv)
320 {
321 char *tmplt = NULL;
322 rrd_info_t *result = NULL;
323 rrd_infoval_t rc;
324 char *opt_daemon = NULL;
325 struct option long_options[] = {
326 {"template", required_argument, 0, 't'},
327 {0, 0, 0, 0}
328 };
330 rc.u_int = -1;
331 optind = 0;
332 opterr = 0; /* initialize getopt */
334 while (1) {
335 int option_index = 0;
336 int opt;
338 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340 if (opt == EOF)
341 break;
343 switch (opt) {
344 case 't':
345 tmplt = optarg;
346 break;
348 case '?':
349 rrd_set_error("unknown option '%s'", argv[optind - 1]);
350 goto end_tag;
351 }
352 }
354 opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355 if (opt_daemon != NULL) {
356 rrd_set_error ("The \"%s\" environment variable is defined, "
357 "but \"%s\" cannot work with rrdcached. Either unset "
358 "the environment variable or use \"update\" instead.",
359 ENV_RRDCACHED_ADDRESS, argv[0]);
360 goto end_tag;
361 }
363 /* need at least 2 arguments: filename, data. */
364 if (argc - optind < 2) {
365 rrd_set_error("Not enough arguments");
366 goto end_tag;
367 }
368 rc.u_int = 0;
369 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370 rc.u_int = _rrd_update(argv[optind], tmplt,
371 argc - optind - 1,
372 (const char **) (argv + optind + 1), result);
373 result->value.u_int = rc.u_int;
374 end_tag:
375 return result;
376 }
378 int rrd_update(
379 int argc,
380 char **argv)
381 {
382 struct option long_options[] = {
383 {"template", required_argument, 0, 't'},
384 {"daemon", required_argument, 0, 'd'},
385 {0, 0, 0, 0}
386 };
387 int option_index = 0;
388 int opt;
389 char *tmplt = NULL;
390 int rc = -1;
391 char *opt_daemon = NULL;
393 optind = 0;
394 opterr = 0; /* initialize getopt */
396 while (1) {
397 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399 if (opt == EOF)
400 break;
402 switch (opt) {
403 case 't':
404 tmplt = strdup(optarg);
405 break;
407 case 'd':
408 if (opt_daemon != NULL)
409 free (opt_daemon);
410 opt_daemon = strdup (optarg);
411 if (opt_daemon == NULL)
412 {
413 rrd_set_error("strdup failed.");
414 goto out;
415 }
416 break;
418 case '?':
419 rrd_set_error("unknown option '%s'", argv[optind - 1]);
420 goto out;
421 }
422 }
424 /* need at least 2 arguments: filename, data. */
425 if (argc - optind < 2) {
426 rrd_set_error("Not enough arguments");
427 goto out;
428 }
430 { /* try to connect to rrdcached */
431 int status = rrdc_connect(opt_daemon);
432 if (status != 0) return status;
433 }
435 if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
436 {
437 rrd_set_error("The caching daemon cannot be used together with "
438 "templates yet.");
439 goto out;
440 }
442 if (! rrdc_is_connected(opt_daemon))
443 {
444 rc = rrd_update_r(argv[optind], tmplt,
445 argc - optind - 1, (const char **) (argv + optind + 1));
446 }
447 else /* we are connected */
448 {
449 rc = rrdc_update (argv[optind], /* file */
450 argc - optind - 1, /* values_num */
451 (const char *const *) (argv + optind + 1)); /* values */
452 if (rc > 0)
453 rrd_set_error("Failed sending the values to rrdcached: %s",
454 rrd_strerror (rc));
455 }
457 out:
458 if (tmplt != NULL)
459 {
460 free(tmplt);
461 tmplt = NULL;
462 }
463 if (opt_daemon != NULL)
464 {
465 free (opt_daemon);
466 opt_daemon = NULL;
467 }
468 return rc;
469 }
471 int rrd_update_r(
472 const char *filename,
473 const char *tmplt,
474 int argc,
475 const char **argv)
476 {
477 return _rrd_update(filename, tmplt, argc, argv, NULL);
478 }
480 int _rrd_update(
481 const char *filename,
482 const char *tmplt,
483 int argc,
484 const char **argv,
485 rrd_info_t * pcdp_summary)
486 {
488 int arg_i = 2;
490 unsigned long rra_begin; /* byte pointer to the rra
491 * area in the rrd file. this
492 * pointer never changes value */
493 rrd_value_t *pdp_new; /* prepare the incoming data to be added
494 * to the existing entry */
495 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
496 * to the cdp values */
498 long *tmpl_idx; /* index representing the settings
499 * transported by the tmplt index */
500 unsigned long tmpl_cnt = 2; /* time and data */
501 rrd_t rrd;
502 time_t current_time = 0;
503 unsigned long current_time_usec = 0; /* microseconds part of current time */
504 char **updvals;
505 int schedule_smooth = 0;
507 /* number of elapsed PDP steps since last update */
508 unsigned long *rra_step_cnt = NULL;
510 int version; /* rrd version */
511 rrd_file_t *rrd_file;
512 char *arg_copy; /* for processing the argv */
513 unsigned long *skip_update; /* RRAs to advance but not write */
515 /* need at least 1 arguments: data. */
516 if (argc < 1) {
517 rrd_set_error("Not enough arguments");
518 goto err_out;
519 }
521 rrd_init(&rrd);
522 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
523 goto err_free;
524 }
525 /* We are now at the beginning of the rra's */
526 rra_begin = rrd_file->header_len;
528 version = atoi(rrd.stat_head->version);
530 initialize_time(¤t_time, ¤t_time_usec, version);
532 /* get exclusive lock to whole file.
533 * lock gets removed when we close the file.
534 */
535 if (rrd_lock(rrd_file) != 0) {
536 rrd_set_error("could not lock RRD");
537 goto err_close;
538 }
540 if (allocate_data_structures(&rrd, &updvals,
541 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542 &rra_step_cnt, &skip_update,
543 &pdp_new) == -1) {
544 goto err_close;
545 }
547 /* loop through the arguments. */
548 for (arg_i = 0; arg_i < argc; arg_i++) {
549 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550 rrd_set_error("failed duplication argv entry");
551 break;
552 }
553 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
555 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556 &pcdp_summary, version, skip_update,
557 &schedule_smooth) == -1) {
558 if (rrd_test_error()) { /* Should have error string always here */
559 char *save_error;
561 /* Prepend file name to error message */
562 if ((save_error = strdup(rrd_get_error())) != NULL) {
563 rrd_set_error("%s: %s", filename, save_error);
564 free(save_error);
565 }
566 }
567 free(arg_copy);
568 break;
569 }
570 free(arg_copy);
571 }
573 free(rra_step_cnt);
575 /* if we got here and if there is an error and if the file has not been
576 * written to, then close things up and return. */
577 if (rrd_test_error()) {
578 goto err_free_structures;
579 }
580 #ifndef HAVE_MMAP
581 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582 goto err_free_structures;
583 }
584 #endif
586 /* calling the smoothing code here guarantees at most one smoothing
587 * operation per rrd_update call. Unfortunately, it is possible with bulk
588 * updates, or a long-delayed update for smoothing to occur off-schedule.
589 * This really isn't critical except during the burn-in cycles. */
590 if (schedule_smooth) {
591 smooth_all_rras(&rrd, rrd_file, rra_begin);
592 }
594 /* rrd_dontneed(rrd_file,&rrd); */
595 rrd_free(&rrd);
596 rrd_close(rrd_file);
598 free(pdp_new);
599 free(tmpl_idx);
600 free(pdp_temp);
601 free(skip_update);
602 free(updvals);
603 return 0;
605 err_free_structures:
606 free(pdp_new);
607 free(tmpl_idx);
608 free(pdp_temp);
609 free(skip_update);
610 free(updvals);
611 err_close:
612 rrd_close(rrd_file);
613 err_free:
614 rrd_free(&rrd);
615 err_out:
616 return -1;
617 }
619 /*
620 * Allocate some important arrays used, and initialize the template.
621 *
622 * When it returns, either all of the structures are allocated
623 * or none of them are.
624 *
625 * Returns 0 on success, -1 on error.
626 */
627 static int allocate_data_structures(
628 rrd_t *rrd,
629 char ***updvals,
630 rrd_value_t **pdp_temp,
631 const char *tmplt,
632 long **tmpl_idx,
633 unsigned long *tmpl_cnt,
634 unsigned long **rra_step_cnt,
635 unsigned long **skip_update,
636 rrd_value_t **pdp_new)
637 {
638 unsigned i, ii;
639 if ((*updvals = (char **) malloc(sizeof(char *)
640 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641 rrd_set_error("allocating updvals pointer array.");
642 return -1;
643 }
644 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
645 * rrd->stat_head->ds_cnt)) ==
646 NULL) {
647 rrd_set_error("allocating pdp_temp.");
648 goto err_free_updvals;
649 }
650 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
651 *
652 rrd->stat_head->rra_cnt)) ==
653 NULL) {
654 rrd_set_error("allocating skip_update.");
655 goto err_free_pdp_temp;
656 }
657 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
658 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
659 rrd_set_error("allocating tmpl_idx.");
660 goto err_free_skip_update;
661 }
662 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
663 *
664 (rrd->stat_head->
665 rra_cnt))) == NULL) {
666 rrd_set_error("allocating rra_step_cnt.");
667 goto err_free_tmpl_idx;
668 }
670 /* initialize tmplt redirector */
671 /* default config example (assume DS 1 is a CDEF DS)
672 tmpl_idx[0] -> 0; (time)
673 tmpl_idx[1] -> 1; (DS 0)
674 tmpl_idx[2] -> 3; (DS 2)
675 tmpl_idx[3] -> 4; (DS 3) */
676 (*tmpl_idx)[0] = 0; /* time */
677 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
678 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
679 (*tmpl_idx)[ii++] = i;
680 }
681 *tmpl_cnt = ii;
683 if (tmplt != NULL) {
684 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
685 goto err_free_rra_step_cnt;
686 }
687 }
689 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
690 * rrd->stat_head->ds_cnt)) == NULL) {
691 rrd_set_error("allocating pdp_new.");
692 goto err_free_rra_step_cnt;
693 }
695 return 0;
697 err_free_rra_step_cnt:
698 free(*rra_step_cnt);
699 err_free_tmpl_idx:
700 free(*tmpl_idx);
701 err_free_skip_update:
702 free(*skip_update);
703 err_free_pdp_temp:
704 free(*pdp_temp);
705 err_free_updvals:
706 free(*updvals);
707 return -1;
708 }
710 /*
711 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
712 *
713 * Returns 0 on success.
714 */
715 static int parse_template(
716 rrd_t *rrd,
717 const char *tmplt,
718 unsigned long *tmpl_cnt,
719 long *tmpl_idx)
720 {
721 char *dsname, *tmplt_copy;
722 unsigned int tmpl_len, i;
723 int ret = 0;
725 *tmpl_cnt = 1; /* the first entry is the time */
727 /* we should work on a writeable copy here */
728 if ((tmplt_copy = strdup(tmplt)) == NULL) {
729 rrd_set_error("error copying tmplt '%s'", tmplt);
730 ret = -1;
731 goto out;
732 }
734 dsname = tmplt_copy;
735 tmpl_len = strlen(tmplt_copy);
736 for (i = 0; i <= tmpl_len; i++) {
737 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
738 tmplt_copy[i] = '\0';
739 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
740 rrd_set_error("tmplt contains more DS definitions than RRD");
741 ret = -1;
742 goto out_free_tmpl_copy;
743 }
744 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
745 rrd_set_error("unknown DS name '%s'", dsname);
746 ret = -1;
747 goto out_free_tmpl_copy;
748 }
749 /* go to the next entry on the tmplt_copy */
750 if (i < tmpl_len)
751 dsname = &tmplt_copy[i + 1];
752 }
753 }
754 out_free_tmpl_copy:
755 free(tmplt_copy);
756 out:
757 return ret;
758 }
760 /*
761 * Parse an update string, updates the primary data points (PDPs)
762 * and consolidated data points (CDPs), and writes changes to the RRAs.
763 *
764 * Returns 0 on success, -1 on error.
765 */
766 static int process_arg(
767 char *step_start,
768 rrd_t *rrd,
769 rrd_file_t *rrd_file,
770 unsigned long rra_begin,
771 time_t *current_time,
772 unsigned long *current_time_usec,
773 rrd_value_t *pdp_temp,
774 rrd_value_t *pdp_new,
775 unsigned long *rra_step_cnt,
776 char **updvals,
777 long *tmpl_idx,
778 unsigned long tmpl_cnt,
779 rrd_info_t ** pcdp_summary,
780 int version,
781 unsigned long *skip_update,
782 int *schedule_smooth)
783 {
784 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
786 /* a vector of future Holt-Winters seasonal coefs */
787 unsigned long elapsed_pdp_st;
789 double interval, pre_int, post_int; /* interval between this and
790 * the last run */
791 unsigned long proc_pdp_cnt;
793 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
794 current_time, current_time_usec, version) == -1) {
795 return -1;
796 }
798 interval = (double) (*current_time - rrd->live_head->last_up)
799 + (double) ((long) *current_time_usec -
800 (long) rrd->live_head->last_up_usec) / 1e6f;
802 /* process the data sources and update the pdp_prep
803 * area accordingly */
804 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
805 return -1;
806 }
808 elapsed_pdp_st = calculate_elapsed_steps(rrd,
809 *current_time,
810 *current_time_usec, interval,
811 &pre_int, &post_int,
812 &proc_pdp_cnt);
814 /* has a pdp_st moment occurred since the last run ? */
815 if (elapsed_pdp_st == 0) {
816 /* no we have not passed a pdp_st moment. therefore update is simple */
817 simple_update(rrd, interval, pdp_new);
818 } else {
819 /* an pdp_st has occurred. */
820 if (process_all_pdp_st(rrd, interval,
821 pre_int, post_int,
822 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
823 return -1;
824 }
825 if (update_all_cdp_prep(rrd, rra_step_cnt,
826 rra_begin, rrd_file,
827 elapsed_pdp_st,
828 proc_pdp_cnt,
829 &last_seasonal_coef,
830 &seasonal_coef,
831 pdp_temp,
832 skip_update, schedule_smooth) == -1) {
833 goto err_free_coefficients;
834 }
835 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
836 elapsed_pdp_st, pdp_temp,
837 &seasonal_coef) == -1) {
838 goto err_free_coefficients;
839 }
840 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
841 *current_time, skip_update,
842 pcdp_summary) == -1) {
843 goto err_free_coefficients;
844 }
845 } /* endif a pdp_st has occurred */
846 rrd->live_head->last_up = *current_time;
847 rrd->live_head->last_up_usec = *current_time_usec;
849 if (version < 3) {
850 *rrd->legacy_last_up = rrd->live_head->last_up;
851 }
852 free(seasonal_coef);
853 free(last_seasonal_coef);
854 return 0;
856 err_free_coefficients:
857 free(seasonal_coef);
858 free(last_seasonal_coef);
859 return -1;
860 }
862 /*
863 * Parse a DS string (time + colon-separated values), storing the
864 * results in current_time, current_time_usec, and updvals.
865 *
866 * Returns 0 on success, -1 on error.
867 */
868 static int parse_ds(
869 rrd_t *rrd,
870 char **updvals,
871 long *tmpl_idx,
872 char *input,
873 unsigned long tmpl_cnt,
874 time_t *current_time,
875 unsigned long *current_time_usec,
876 int version)
877 {
878 char *p;
879 unsigned long i;
880 char timesyntax;
882 updvals[0] = input;
883 /* initialize all ds input to unknown except the first one
884 which has always got to be set */
885 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
886 updvals[i] = "U";
888 /* separate all ds elements; first must be examined separately
889 due to alternate time syntax */
890 if ((p = strchr(input, '@')) != NULL) {
891 timesyntax = '@';
892 } else if ((p = strchr(input, ':')) != NULL) {
893 timesyntax = ':';
894 } else {
895 rrd_set_error("expected timestamp not found in data source from %s",
896 input);
897 return -1;
898 }
899 *p = '\0';
900 i = 1;
901 updvals[tmpl_idx[i++]] = p + 1;
902 while (*(++p)) {
903 if (*p == ':') {
904 *p = '\0';
905 if (i < tmpl_cnt) {
906 updvals[tmpl_idx[i++]] = p + 1;
907 }
908 else {
909 rrd_set_error("found extra data on update argument: %s",p+1);
910 return -1;
911 }
912 }
913 }
915 if (i != tmpl_cnt) {
916 rrd_set_error("expected %lu data source readings (got %lu) from %s",
917 tmpl_cnt - 1, i - 1, input);
918 return -1;
919 }
921 if (get_time_from_reading(rrd, timesyntax, updvals,
922 current_time, current_time_usec,
923 version) == -1) {
924 return -1;
925 }
926 return 0;
927 }
929 /*
930 * Parse the time in a DS string, store it in current_time and
931 * current_time_usec and verify that it's later than the last
932 * update for this DS.
933 *
934 * Returns 0 on success, -1 on error.
935 */
936 static int get_time_from_reading(
937 rrd_t *rrd,
938 char timesyntax,
939 char **updvals,
940 time_t *current_time,
941 unsigned long *current_time_usec,
942 int version)
943 {
944 double tmp;
945 char *parsetime_error = NULL;
946 char *old_locale;
947 rrd_time_value_t ds_tv;
948 struct timeval tmp_time; /* used for time conversion */
950 /* get the time from the reading ... handle N */
951 if (timesyntax == '@') { /* at-style */
952 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
953 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
954 return -1;
955 }
956 if (ds_tv.type == RELATIVE_TO_END_TIME ||
957 ds_tv.type == RELATIVE_TO_START_TIME) {
958 rrd_set_error("specifying time relative to the 'start' "
959 "or 'end' makes no sense here: %s", updvals[0]);
960 return -1;
961 }
962 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
963 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
964 } else if (strcmp(updvals[0], "N") == 0) {
965 gettimeofday(&tmp_time, 0);
966 normalize_time(&tmp_time);
967 *current_time = tmp_time.tv_sec;
968 *current_time_usec = tmp_time.tv_usec;
969 } else {
970 old_locale = setlocale(LC_NUMERIC, NULL);
971 setlocale(LC_NUMERIC, "C");
972 errno = 0;
973 tmp = strtod(updvals[0], 0);
974 if (errno > 0) {
975 rrd_set_error("converting '%s' to float: %s",
976 updvals[0], rrd_strerror(errno));
977 return -1;
978 };
979 setlocale(LC_NUMERIC, old_locale);
980 if (tmp < 0.0){
981 gettimeofday(&tmp_time, 0);
982 tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
983 }
985 *current_time = floor(tmp);
986 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
987 }
988 /* dont do any correction for old version RRDs */
989 if (version < 3)
990 *current_time_usec = 0;
992 if (*current_time < rrd->live_head->last_up ||
993 (*current_time == rrd->live_head->last_up &&
994 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
995 rrd_set_error("illegal attempt to update using time %ld when "
996 "last update time is %ld (minimum one second step)",
997 *current_time, rrd->live_head->last_up);
998 return -1;
999 }
1000 return 0;
1001 }
1003 /*
1004 * Update pdp_new by interpreting the updvals according to the DS type
1005 * (COUNTER, GAUGE, etc.).
1006 *
1007 * Returns 0 on success, -1 on error.
1008 */
1009 static int update_pdp_prep(
1010 rrd_t *rrd,
1011 char **updvals,
1012 rrd_value_t *pdp_new,
1013 double interval)
1014 {
1015 unsigned long ds_idx;
1016 int ii;
1017 char *endptr; /* used in the conversion */
1018 double rate;
1019 char *old_locale;
1020 enum dst_en dst_idx;
1022 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1023 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1025 /* make sure we do not build diffs with old last_ds values */
1026 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1027 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1028 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1029 }
1031 /* NOTE: DST_CDEF should never enter this if block, because
1032 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1033 * accidently specified a value for the DST_CDEF. To handle this case,
1034 * an extra check is required. */
1036 if ((updvals[ds_idx + 1][0] != 'U') &&
1037 (dst_idx != DST_CDEF) &&
1038 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1039 rate = DNAN;
1041 /* pdp_new contains rate * time ... eg the bytes transferred during
1042 * the interval. Doing it this way saves a lot of math operations
1043 */
1044 switch (dst_idx) {
1045 case DST_COUNTER:
1046 case DST_DERIVE:
1047 /* Check if this is a valid integer. `U' is already handled in
1048 * another branch. */
1049 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1050 if ((ii == 0) && (dst_idx == DST_DERIVE)
1051 && (updvals[ds_idx + 1][ii] == '-'))
1052 continue;
1054 if ((updvals[ds_idx + 1][ii] < '0')
1055 || (updvals[ds_idx + 1][ii] > '9')) {
1056 rrd_set_error("not a simple %s integer: '%s'",
1057 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1058 updvals[ds_idx + 1]);
1059 return -1;
1060 }
1061 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1063 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1064 pdp_new[ds_idx] =
1065 rrd_diff(updvals[ds_idx + 1],
1066 rrd->pdp_prep[ds_idx].last_ds);
1067 if (dst_idx == DST_COUNTER) {
1068 /* simple overflow catcher. This will fail
1069 * terribly for non 32 or 64 bit counters
1070 * ... are there any others in SNMP land?
1071 */
1072 if (pdp_new[ds_idx] < (double) 0.0)
1073 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1074 if (pdp_new[ds_idx] < (double) 0.0)
1075 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1076 }
1077 rate = pdp_new[ds_idx] / interval;
1078 } else {
1079 pdp_new[ds_idx] = DNAN;
1080 }
1081 break;
1082 case DST_ABSOLUTE:
1083 old_locale = setlocale(LC_NUMERIC, NULL);
1084 setlocale(LC_NUMERIC, "C");
1085 errno = 0;
1086 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1087 if (errno > 0) {
1088 rrd_set_error("converting '%s' to float: %s",
1089 updvals[ds_idx + 1], rrd_strerror(errno));
1090 return -1;
1091 };
1092 setlocale(LC_NUMERIC, old_locale);
1093 if (endptr[0] != '\0') {
1094 rrd_set_error
1095 ("conversion of '%s' to float not complete: tail '%s'",
1096 updvals[ds_idx + 1], endptr);
1097 return -1;
1098 }
1099 rate = pdp_new[ds_idx] / interval;
1100 break;
1101 case DST_GAUGE:
1102 old_locale = setlocale(LC_NUMERIC, NULL);
1103 setlocale(LC_NUMERIC, "C");
1104 errno = 0;
1105 pdp_new[ds_idx] =
1106 strtod(updvals[ds_idx + 1], &endptr) * interval;
1107 if (errno) {
1108 rrd_set_error("converting '%s' to float: %s",
1109 updvals[ds_idx + 1], rrd_strerror(errno));
1110 return -1;
1111 };
1112 setlocale(LC_NUMERIC, old_locale);
1113 if (endptr[0] != '\0') {
1114 rrd_set_error
1115 ("conversion of '%s' to float not complete: tail '%s'",
1116 updvals[ds_idx + 1], endptr);
1117 return -1;
1118 }
1119 rate = pdp_new[ds_idx] / interval;
1120 break;
1121 default:
1122 rrd_set_error("rrd contains unknown DS type : '%s'",
1123 rrd->ds_def[ds_idx].dst);
1124 return -1;
1125 }
1126 /* break out of this for loop if the error string is set */
1127 if (rrd_test_error()) {
1128 return -1;
1129 }
1130 /* make sure pdp_temp is neither too large or too small
1131 * if any of these occur it becomes unknown ...
1132 * sorry folks ... */
1133 if (!isnan(rate) &&
1134 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1135 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1136 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1137 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1138 pdp_new[ds_idx] = DNAN;
1139 }
1140 } else {
1141 /* no news is news all the same */
1142 pdp_new[ds_idx] = DNAN;
1143 }
1146 /* make a copy of the command line argument for the next run */
1147 #ifdef DEBUG
1148 fprintf(stderr, "prep ds[%lu]\t"
1149 "last_arg '%s'\t"
1150 "this_arg '%s'\t"
1151 "pdp_new %10.2f\n",
1152 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1153 pdp_new[ds_idx]);
1154 #endif
1155 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1156 LAST_DS_LEN - 1);
1157 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1158 }
1159 return 0;
1160 }
1162 /*
1163 * How many PDP steps have elapsed since the last update? Returns the answer,
1164 * and stores the time between the last update and the last PDP in pre_time,
1165 * and the time between the last PDP and the current time in post_int.
1166 */
1167 static int calculate_elapsed_steps(
1168 rrd_t *rrd,
1169 unsigned long current_time,
1170 unsigned long current_time_usec,
1171 double interval,
1172 double *pre_int,
1173 double *post_int,
1174 unsigned long *proc_pdp_cnt)
1175 {
1176 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1177 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1178 * time */
1179 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1180 * when it was last updated */
1181 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1183 /* when was the current pdp started */
1184 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1185 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1187 /* when did the last pdp_st occur */
1188 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1189 occu_pdp_st = current_time - occu_pdp_age;
1191 if (occu_pdp_st > proc_pdp_st) {
1192 /* OK we passed the pdp_st moment */
1193 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1194 * occurred before the latest
1195 * pdp_st moment*/
1196 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1197 *post_int = occu_pdp_age; /* how much after it */
1198 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1199 } else {
1200 *pre_int = interval;
1201 *post_int = 0;
1202 }
1204 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1206 #ifdef DEBUG
1207 printf("proc_pdp_age %lu\t"
1208 "proc_pdp_st %lu\t"
1209 "occu_pfp_age %lu\t"
1210 "occu_pdp_st %lu\t"
1211 "int %lf\t"
1212 "pre_int %lf\t"
1213 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1214 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1215 #endif
1217 /* compute the number of elapsed pdp_st moments */
1218 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1219 }
1221 /*
1222 * Increment the PDP values by the values in pdp_new, or else initialize them.
1223 */
1224 static void simple_update(
1225 rrd_t *rrd,
1226 double interval,
1227 rrd_value_t *pdp_new)
1228 {
1229 int i;
1231 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1232 if (isnan(pdp_new[i])) {
1233 /* this is not really accurate if we use subsecond data arrival time
1234 should have thought of it when going subsecond resolution ...
1235 sorry next format change we will have it! */
1236 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1237 floor(interval);
1238 } else {
1239 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1240 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1241 } else {
1242 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1243 }
1244 }
1245 #ifdef DEBUG
1246 fprintf(stderr,
1247 "NO PDP ds[%i]\t"
1248 "value %10.2f\t"
1249 "unkn_sec %5lu\n",
1250 i,
1251 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1252 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1253 #endif
1254 }
1255 }
1257 /*
1258 * Call process_pdp_st for each DS.
1259 *
1260 * Returns 0 on success, -1 on error.
1261 */
1262 static int process_all_pdp_st(
1263 rrd_t *rrd,
1264 double interval,
1265 double pre_int,
1266 double post_int,
1267 unsigned long elapsed_pdp_st,
1268 rrd_value_t *pdp_new,
1269 rrd_value_t *pdp_temp)
1270 {
1271 unsigned long ds_idx;
1273 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1274 rate*seconds which occurred up to the last run.
1275 pdp_new[] contains rate*seconds from the latest run.
1276 pdp_temp[] will contain the rate for cdp */
1278 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1279 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1280 elapsed_pdp_st * rrd->stat_head->pdp_step,
1281 pdp_new, pdp_temp) == -1) {
1282 return -1;
1283 }
1284 #ifdef DEBUG
1285 fprintf(stderr, "PDP UPD ds[%lu]\t"
1286 "elapsed_pdp_st %lu\t"
1287 "pdp_temp %10.2f\t"
1288 "new_prep %10.2f\t"
1289 "new_unkn_sec %5lu\n",
1290 ds_idx,
1291 elapsed_pdp_st,
1292 pdp_temp[ds_idx],
1293 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1294 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1295 #endif
1296 }
1297 return 0;
1298 }
1300 /*
1301 * Process an update that occurs after one of the PDP moments.
1302 * Increments the PDP value, sets NAN if time greater than the
1303 * heartbeats have elapsed, processes CDEFs.
1304 *
1305 * Returns 0 on success, -1 on error.
1306 */
1307 static int process_pdp_st(
1308 rrd_t *rrd,
1309 unsigned long ds_idx,
1310 double interval,
1311 double pre_int,
1312 double post_int,
1313 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1314 rrd_value_t *pdp_new,
1315 rrd_value_t *pdp_temp)
1316 {
1317 int i;
1319 /* update pdp_prep to the current pdp_st. */
1320 double pre_unknown = 0.0;
1321 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1322 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1324 rpnstack_t rpnstack; /* used for COMPUTE DS */
1326 rpnstack_init(&rpnstack);
1329 if (isnan(pdp_new[ds_idx])) {
1330 /* a final bit of unknown to be added before calculation
1331 we use a temporary variable for this so that we
1332 don't have to turn integer lines before using the value */
1333 pre_unknown = pre_int;
1334 } else {
1335 if (isnan(scratch[PDP_val].u_val)) {
1336 scratch[PDP_val].u_val = 0;
1337 }
1338 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1339 }
1341 /* if too much of the pdp_prep is unknown we dump it */
1342 /* if the interval is larger thatn mrhb we get NAN */
1343 if ((interval > mrhb) ||
1344 (rrd->stat_head->pdp_step / 2.0 <
1345 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1346 pdp_temp[ds_idx] = DNAN;
1347 } else {
1348 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1349 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1350 pre_unknown);
1351 }
1353 /* process CDEF data sources; remember each CDEF DS can
1354 * only reference other DS with a lower index number */
1355 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1356 rpnp_t *rpnp;
1358 rpnp =
1359 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1360 if(rpnp == NULL) {
1361 rpnstack_free(&rpnstack);
1362 return -1;
1363 }
1364 /* substitute data values for OP_VARIABLE nodes */
1365 for (i = 0; rpnp[i].op != OP_END; i++) {
1366 if (rpnp[i].op == OP_VARIABLE) {
1367 rpnp[i].op = OP_NUMBER;
1368 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1369 }
1370 }
1371 /* run the rpn calculator */
1372 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1373 free(rpnp);
1374 rpnstack_free(&rpnstack);
1375 return -1;
1376 }
1377 free(rpnp);
1378 }
1380 /* make pdp_prep ready for the next run */
1381 if (isnan(pdp_new[ds_idx])) {
1382 /* this is not realy accurate if we use subsecond data arival time
1383 should have thought of it when going subsecond resolution ...
1384 sorry next format change we will have it! */
1385 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1386 scratch[PDP_val].u_val = DNAN;
1387 } else {
1388 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1389 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1390 }
1391 rpnstack_free(&rpnstack);
1392 return 0;
1393 }
1395 /*
1396 * Iterate over all the RRAs for a given DS and:
1397 * 1. Decide whether to schedule a smooth later
1398 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1399 * 3. Update the CDP
1400 *
1401 * Returns 0 on success, -1 on error
1402 */
1403 static int update_all_cdp_prep(
1404 rrd_t *rrd,
1405 unsigned long *rra_step_cnt,
1406 unsigned long rra_begin,
1407 rrd_file_t *rrd_file,
1408 unsigned long elapsed_pdp_st,
1409 unsigned long proc_pdp_cnt,
1410 rrd_value_t **last_seasonal_coef,
1411 rrd_value_t **seasonal_coef,
1412 rrd_value_t *pdp_temp,
1413 unsigned long *skip_update,
1414 int *schedule_smooth)
1415 {
1416 unsigned long rra_idx;
1418 /* index into the CDP scratch array */
1419 enum cf_en current_cf;
1420 unsigned long rra_start;
1422 /* number of rows to be updated in an RRA for a data value. */
1423 unsigned long start_pdp_offset;
1425 rra_start = rra_begin;
1426 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1427 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1428 start_pdp_offset =
1429 rrd->rra_def[rra_idx].pdp_cnt -
1430 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1431 skip_update[rra_idx] = 0;
1432 if (start_pdp_offset <= elapsed_pdp_st) {
1433 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1434 rrd->rra_def[rra_idx].pdp_cnt + 1;
1435 } else {
1436 rra_step_cnt[rra_idx] = 0;
1437 }
1439 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1440 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1441 * so that they will be correct for the next observed value; note that for
1442 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1443 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1444 if (rra_step_cnt[rra_idx] > 1) {
1445 skip_update[rra_idx] = 1;
1446 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1447 elapsed_pdp_st, last_seasonal_coef);
1448 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1449 elapsed_pdp_st + 1, seasonal_coef);
1450 }
1451 /* periodically run a smoother for seasonal effects */
1452 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1453 #ifdef DEBUG
1454 fprintf(stderr,
1455 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1456 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1457 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1458 u_cnt);
1459 #endif
1460 *schedule_smooth = 1;
1461 }
1462 }
1463 if (rrd_test_error())
1464 return -1;
1466 if (update_cdp_prep
1467 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1468 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1469 current_cf) == -1) {
1470 return -1;
1471 }
1472 rra_start +=
1473 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1474 sizeof(rrd_value_t);
1475 }
1476 return 0;
1477 }
1479 /*
1480 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1481 */
1482 static int do_schedule_smooth(
1483 rrd_t *rrd,
1484 unsigned long rra_idx,
1485 unsigned long elapsed_pdp_st)
1486 {
1487 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1488 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1489 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1490 unsigned long seasonal_smooth_idx =
1491 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1492 unsigned long *init_seasonal =
1493 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1495 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1496 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1497 * really an RRA level, not a data source within RRA level parameter, but
1498 * the rra_def is read only for rrd_update (not flushed to disk). */
1499 if (*init_seasonal > BURNIN_CYCLES) {
1500 /* someone has no doubt invented a trick to deal with this wrap around,
1501 * but at least this code is clear. */
1502 if (seasonal_smooth_idx > cur_row) {
1503 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1504 * between PDP and CDP */
1505 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1506 }
1507 /* can't rely on negative numbers because we are working with
1508 * unsigned values */
1509 return (cur_row + elapsed_pdp_st >= row_cnt
1510 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1511 }
1512 /* mark off one of the burn-in cycles */
1513 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1514 }
1516 /*
1517 * For a given RRA, iterate over the data sources and call the appropriate
1518 * consolidation function.
1519 *
1520 * Returns 0 on success, -1 on error.
1521 */
1522 static int update_cdp_prep(
1523 rrd_t *rrd,
1524 unsigned long elapsed_pdp_st,
1525 unsigned long start_pdp_offset,
1526 unsigned long *rra_step_cnt,
1527 int rra_idx,
1528 rrd_value_t *pdp_temp,
1529 rrd_value_t *last_seasonal_coef,
1530 rrd_value_t *seasonal_coef,
1531 int current_cf)
1532 {
1533 unsigned long ds_idx, cdp_idx;
1535 /* update CDP_PREP areas */
1536 /* loop over data soures within each RRA */
1537 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1539 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1541 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1542 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1543 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1544 elapsed_pdp_st, start_pdp_offset,
1545 rrd->rra_def[rra_idx].pdp_cnt,
1546 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1547 rra_idx, ds_idx);
1548 } else {
1549 /* Nothing to consolidate if there's one PDP per CDP. However, if
1550 * we've missed some PDPs, let's update null counters etc. */
1551 if (elapsed_pdp_st > 2) {
1552 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1553 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1554 (enum cf_en)current_cf);
1555 }
1556 }
1558 if (rrd_test_error())
1559 return -1;
1560 } /* endif data sources loop */
1561 return 0;
1562 }
1564 /*
1565 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1566 * primary value, secondary value, and # of unknowns.
1567 */
1568 static void update_cdp(
1569 unival *scratch,
1570 int current_cf,
1571 rrd_value_t pdp_temp_val,
1572 unsigned long rra_step_cnt,
1573 unsigned long elapsed_pdp_st,
1574 unsigned long start_pdp_offset,
1575 unsigned long pdp_cnt,
1576 rrd_value_t xff,
1577 int i,
1578 int ii)
1579 {
1580 /* shorthand variables */
1581 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1582 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1583 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1584 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1586 if (rra_step_cnt) {
1587 /* If we are in this block, as least 1 CDP value will be written to
1588 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1589 * to be written, then the "fill in" value is the CDP_secondary_val
1590 * entry. */
1591 if (isnan(pdp_temp_val)) {
1592 *cdp_unkn_pdp_cnt += start_pdp_offset;
1593 *cdp_secondary_val = DNAN;
1594 } else {
1595 /* CDP_secondary value is the RRA "fill in" value for intermediary
1596 * CDP data entries. No matter the CF, the value is the same because
1597 * the average, max, min, and last of a list of identical values is
1598 * the same, namely, the value itself. */
1599 *cdp_secondary_val = pdp_temp_val;
1600 }
1602 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1603 *cdp_primary_val = DNAN;
1604 } else {
1605 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1606 start_pdp_offset, pdp_cnt);
1607 }
1608 *cdp_val =
1609 initialize_carry_over(pdp_temp_val,current_cf,
1610 elapsed_pdp_st,
1611 start_pdp_offset, pdp_cnt);
1612 /* endif meets xff value requirement for a valid value */
1613 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1614 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1615 if (isnan(pdp_temp_val))
1616 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1617 else
1618 *cdp_unkn_pdp_cnt = 0;
1619 } else { /* rra_step_cnt[i] == 0 */
1621 #ifdef DEBUG
1622 if (isnan(*cdp_val)) {
1623 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1624 i, ii);
1625 } else {
1626 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1627 i, ii, *cdp_val);
1628 }
1629 #endif
1630 if (isnan(pdp_temp_val)) {
1631 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1632 } else {
1633 *cdp_val =
1634 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1635 current_cf, i, ii);
1636 }
1637 }
1638 }
1640 /*
1641 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1642 * on the type of consolidation function.
1643 */
1644 static void initialize_cdp_val(
1645 unival *scratch,
1646 int current_cf,
1647 rrd_value_t pdp_temp_val,
1648 unsigned long start_pdp_offset,
1649 unsigned long pdp_cnt)
1650 {
1651 rrd_value_t cum_val, cur_val;
1653 switch (current_cf) {
1654 case CF_AVERAGE:
1655 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1656 cur_val = IFDNAN(pdp_temp_val, 0.0);
1657 scratch[CDP_primary_val].u_val =
1658 (cum_val + cur_val * start_pdp_offset) /
1659 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1660 break;
1661 case CF_MAXIMUM:
1662 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1663 cur_val = IFDNAN(pdp_temp_val, -DINF);
1665 #if 0
1666 #ifdef DEBUG
1667 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1668 fprintf(stderr,
1669 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1670 i, ii);
1671 exit(-1);
1672 }
1673 #endif
1674 #endif
1675 if (cur_val > cum_val)
1676 scratch[CDP_primary_val].u_val = cur_val;
1677 else
1678 scratch[CDP_primary_val].u_val = cum_val;
1679 break;
1680 case CF_MINIMUM:
1681 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1682 cur_val = IFDNAN(pdp_temp_val, DINF);
1683 #if 0
1684 #ifdef DEBUG
1685 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1686 fprintf(stderr,
1687 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1688 ii);
1689 exit(-1);
1690 }
1691 #endif
1692 #endif
1693 if (cur_val < cum_val)
1694 scratch[CDP_primary_val].u_val = cur_val;
1695 else
1696 scratch[CDP_primary_val].u_val = cum_val;
1697 break;
1698 case CF_LAST:
1699 default:
1700 scratch[CDP_primary_val].u_val = pdp_temp_val;
1701 break;
1702 }
1703 }
1705 /*
1706 * Update the consolidation function for Holt-Winters functions as
1707 * well as other functions that don't actually consolidate multiple
1708 * PDPs.
1709 */
1710 static void reset_cdp(
1711 rrd_t *rrd,
1712 unsigned long elapsed_pdp_st,
1713 rrd_value_t *pdp_temp,
1714 rrd_value_t *last_seasonal_coef,
1715 rrd_value_t *seasonal_coef,
1716 int rra_idx,
1717 int ds_idx,
1718 int cdp_idx,
1719 enum cf_en current_cf)
1720 {
1721 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1723 switch (current_cf) {
1724 case CF_AVERAGE:
1725 default:
1726 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1727 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1728 break;
1729 case CF_SEASONAL:
1730 case CF_DEVSEASONAL:
1731 /* need to update cached seasonal values, so they are consistent
1732 * with the bulk update */
1733 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1734 * CDP_last_deviation are the same. */
1735 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1736 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1737 break;
1738 case CF_HWPREDICT:
1739 case CF_MHWPREDICT:
1740 /* need to update the null_count and last_null_count.
1741 * even do this for non-DNAN pdp_temp because the
1742 * algorithm is not learning from batch updates. */
1743 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1744 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1745 /* fall through */
1746 case CF_DEVPREDICT:
1747 scratch[CDP_primary_val].u_val = DNAN;
1748 scratch[CDP_secondary_val].u_val = DNAN;
1749 break;
1750 case CF_FAILURES:
1751 /* do not count missed bulk values as failures */
1752 scratch[CDP_primary_val].u_val = 0;
1753 scratch[CDP_secondary_val].u_val = 0;
1754 /* need to reset violations buffer.
1755 * could do this more carefully, but for now, just
1756 * assume a bulk update wipes away all violations. */
1757 erase_violations(rrd, cdp_idx, rra_idx);
1758 break;
1759 }
1760 }
1762 static rrd_value_t initialize_carry_over(
1763 rrd_value_t pdp_temp_val,
1764 int current_cf,
1765 unsigned long elapsed_pdp_st,
1766 unsigned long start_pdp_offset,
1767 unsigned long pdp_cnt)
1768 {
1769 unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1770 if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1771 switch (current_cf) {
1772 case CF_MAXIMUM:
1773 return -DINF;
1774 case CF_MINIMUM:
1775 return DINF;
1776 case CF_AVERAGE:
1777 return 0;
1778 default:
1779 return DNAN;
1780 }
1781 }
1782 else {
1783 switch (current_cf) {
1784 case CF_AVERAGE:
1785 return pdp_temp_val * pdp_into_cdp_cnt ;
1786 default:
1787 return pdp_temp_val;
1788 }
1789 }
1790 }
1792 /*
1793 * Update or initialize a CDP value based on the consolidation
1794 * function.
1795 *
1796 * Returns the new value.
1797 */
1798 static rrd_value_t calculate_cdp_val(
1799 rrd_value_t cdp_val,
1800 rrd_value_t pdp_temp_val,
1801 unsigned long elapsed_pdp_st,
1802 int current_cf,
1803 #ifdef DEBUG
1804 int i,
1805 int ii
1806 #else
1807 int UNUSED(i),
1808 int UNUSED(ii)
1809 #endif
1810 )
1811 {
1812 if (isnan(cdp_val)) {
1813 if (current_cf == CF_AVERAGE) {
1814 pdp_temp_val *= elapsed_pdp_st;
1815 }
1816 #ifdef DEBUG
1817 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1818 i, ii, pdp_temp_val);
1819 #endif
1820 return pdp_temp_val;
1821 }
1822 if (current_cf == CF_AVERAGE)
1823 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1824 if (current_cf == CF_MINIMUM)
1825 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1826 if (current_cf == CF_MAXIMUM)
1827 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1829 return pdp_temp_val;
1830 }
1832 /*
1833 * For each RRA, update the seasonal values and then call update_aberrant_CF
1834 * for each data source.
1835 *
1836 * Return 0 on success, -1 on error.
1837 */
1838 static int update_aberrant_cdps(
1839 rrd_t *rrd,
1840 rrd_file_t *rrd_file,
1841 unsigned long rra_begin,
1842 unsigned long elapsed_pdp_st,
1843 rrd_value_t *pdp_temp,
1844 rrd_value_t **seasonal_coef)
1845 {
1846 unsigned long rra_idx, ds_idx, j;
1848 /* number of PDP steps since the last update that
1849 * are assigned to the first CDP to be generated
1850 * since the last update. */
1851 unsigned short scratch_idx;
1852 unsigned long rra_start;
1853 enum cf_en current_cf;
1855 /* this loop is only entered if elapsed_pdp_st < 3 */
1856 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1857 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1858 rra_start = rra_begin;
1859 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1860 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1861 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1862 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1863 if (scratch_idx == CDP_primary_val) {
1864 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1865 elapsed_pdp_st + 1, seasonal_coef);
1866 } else {
1867 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1868 elapsed_pdp_st + 2, seasonal_coef);
1869 }
1870 }
1871 if (rrd_test_error())
1872 return -1;
1873 /* loop over data soures within each RRA */
1874 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1875 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1876 rra_idx * (rrd->stat_head->ds_cnt) +
1877 ds_idx, rra_idx, ds_idx, scratch_idx,
1878 *seasonal_coef);
1879 }
1880 }
1881 rra_start += rrd->rra_def[rra_idx].row_cnt
1882 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1883 }
1884 }
1885 return 0;
1886 }
1888 /*
1889 * Move sequentially through the file, writing one RRA at a time. Note this
1890 * architecture divorces the computation of CDP with flushing updated RRA
1891 * entries to disk.
1892 *
1893 * Return 0 on success, -1 on error.
1894 */
1895 static int write_to_rras(
1896 rrd_t *rrd,
1897 rrd_file_t *rrd_file,
1898 unsigned long *rra_step_cnt,
1899 unsigned long rra_begin,
1900 time_t current_time,
1901 unsigned long *skip_update,
1902 rrd_info_t ** pcdp_summary)
1903 {
1904 unsigned long rra_idx;
1905 unsigned long rra_start;
1906 time_t rra_time = 0; /* time of update for a RRA */
1908 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1910 /* Ready to write to disk */
1911 rra_start = rra_begin;
1913 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1914 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1915 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1917 /* for cdp_prep */
1918 unsigned short scratch_idx;
1919 unsigned long step_subtract;
1921 for (scratch_idx = CDP_primary_val,
1922 step_subtract = 1;
1923 rra_step_cnt[rra_idx] > 0;
1924 rra_step_cnt[rra_idx]--,
1925 scratch_idx = CDP_secondary_val,
1926 step_subtract = 2) {
1928 size_t rra_pos_new;
1929 #ifdef DEBUG
1930 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1931 #endif
1932 /* increment, with wrap-around */
1933 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1934 rra_ptr->cur_row = 0;
1936 /* we know what our position should be */
1937 rra_pos_new = rra_start
1938 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1940 /* re-seek if the position is wrong or we wrapped around */
1941 if ((size_t)rra_pos_new != rrd_file->pos) {
1942 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1943 rrd_set_error("seek error in rrd");
1944 return -1;
1945 }
1946 }
1947 #ifdef DEBUG
1948 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1949 #endif
1951 if (skip_update[rra_idx])
1952 continue;
1954 if (*pcdp_summary != NULL) {
1955 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1957 rra_time = (current_time - current_time % step_time)
1958 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1959 }
1961 if (write_RRA_row
1962 (rrd_file, rrd, rra_idx, scratch_idx,
1963 pcdp_summary, rra_time) == -1)
1964 return -1;
1966 rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1967 }
1969 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1970 } /* RRA LOOP */
1972 return 0;
1973 }
1975 /*
1976 * Write out one row of values (one value per DS) to the archive.
1977 *
1978 * Returns 0 on success, -1 on error.
1979 */
1980 static int write_RRA_row(
1981 rrd_file_t *rrd_file,
1982 rrd_t *rrd,
1983 unsigned long rra_idx,
1984 unsigned short CDP_scratch_idx,
1985 rrd_info_t ** pcdp_summary,
1986 time_t rra_time)
1987 {
1988 unsigned long ds_idx, cdp_idx;
1989 rrd_infoval_t iv;
1991 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1992 /* compute the cdp index */
1993 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1994 #ifdef DEBUG
1995 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1996 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1997 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1998 #endif
1999 if (*pcdp_summary != NULL) {
2000 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2001 /* append info to the return hash */
2002 *pcdp_summary = rrd_info_push(*pcdp_summary,
2003 sprintf_alloc
2004 ("[%lli]RRA[%s][%lu]DS[%s]",
2005 (long long)rra_time,
2006 rrd->rra_def[rra_idx].cf_nam,
2007 rrd->rra_def[rra_idx].pdp_cnt,
2008 rrd->ds_def[ds_idx].ds_nam),
2009 RD_I_VAL, iv);
2010 }
2011 errno = 0;
2012 if (rrd_write(rrd_file,
2013 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2014 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2015 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2016 return -1;
2017 }
2018 }
2019 return 0;
2020 }
2022 /*
2023 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2024 *
2025 * Returns 0 on success, -1 otherwise
2026 */
2027 static int smooth_all_rras(
2028 rrd_t *rrd,
2029 rrd_file_t *rrd_file,
2030 unsigned long rra_begin)
2031 {
2032 unsigned long rra_start = rra_begin;
2033 unsigned long rra_idx;
2035 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2036 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2037 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2038 #ifdef DEBUG
2039 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2040 #endif
2041 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2042 if (rrd_test_error())
2043 return -1;
2044 }
2045 rra_start += rrd->rra_def[rra_idx].row_cnt
2046 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2047 }
2048 return 0;
2049 }
2051 #ifndef HAVE_MMAP
2052 /*
2053 * Flush changes to disk (unless we're using mmap)
2054 *
2055 * Returns 0 on success, -1 otherwise
2056 */
2057 static int write_changes_to_disk(
2058 rrd_t *rrd,
2059 rrd_file_t *rrd_file,
2060 int version)
2061 {
2062 /* we just need to write back the live header portion now */
2063 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2064 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2065 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2066 SEEK_SET) != 0) {
2067 rrd_set_error("seek rrd for live header writeback");
2068 return -1;
2069 }
2070 if (version >= 3) {
2071 if (rrd_write(rrd_file, rrd->live_head,
2072 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2073 rrd_set_error("rrd_write live_head to rrd");
2074 return -1;
2075 }
2076 } else {
2077 if (rrd_write(rrd_file, rrd->legacy_last_up,
2078 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2079 rrd_set_error("rrd_write live_head to rrd");
2080 return -1;
2081 }
2082 }
2085 if (rrd_write(rrd_file, rrd->pdp_prep,
2086 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2087 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2088 rrd_set_error("rrd_write pdp_prep to rrd");
2089 return -1;
2090 }
2092 if (rrd_write(rrd_file, rrd->cdp_prep,
2093 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2094 rrd->stat_head->ds_cnt)
2095 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2096 rrd->stat_head->ds_cnt)) {
2098 rrd_set_error("rrd_write cdp_prep to rrd");
2099 return -1;
2100 }
2102 if (rrd_write(rrd_file, rrd->rra_ptr,
2103 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2104 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2105 rrd_set_error("rrd_write rra_ptr to rrd");
2106 return -1;
2107 }
2108 return 0;
2109 }
2110 #endif