2b4a29300844208026abfca9331bc6222c78cb8a
1 /*****************************************************************************
2 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
3 * Copyright by Florian Forster, 2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int rrd_update_r(
67 const char *filename,
68 const char *tmplt,
69 int argc,
70 const char **argv);
71 int _rrd_update(
72 const char *filename,
73 const char *tmplt,
74 int argc,
75 const char **argv,
76 rrd_info_t *);
78 static int allocate_data_structures(
79 rrd_t *rrd,
80 char ***updvals,
81 rrd_value_t **pdp_temp,
82 const char *tmplt,
83 long **tmpl_idx,
84 unsigned long *tmpl_cnt,
85 unsigned long **rra_step_cnt,
86 unsigned long **skip_update,
87 rrd_value_t **pdp_new);
89 static int parse_template(
90 rrd_t *rrd,
91 const char *tmplt,
92 unsigned long *tmpl_cnt,
93 long *tmpl_idx);
95 static int process_arg(
96 char *step_start,
97 rrd_t *rrd,
98 rrd_file_t *rrd_file,
99 unsigned long rra_begin,
100 time_t *current_time,
101 unsigned long *current_time_usec,
102 rrd_value_t *pdp_temp,
103 rrd_value_t *pdp_new,
104 unsigned long *rra_step_cnt,
105 char **updvals,
106 long *tmpl_idx,
107 unsigned long tmpl_cnt,
108 rrd_info_t ** pcdp_summary,
109 int version,
110 unsigned long *skip_update,
111 int *schedule_smooth);
113 static int parse_ds(
114 rrd_t *rrd,
115 char **updvals,
116 long *tmpl_idx,
117 char *input,
118 unsigned long tmpl_cnt,
119 time_t *current_time,
120 unsigned long *current_time_usec,
121 int version);
123 static int get_time_from_reading(
124 rrd_t *rrd,
125 char timesyntax,
126 char **updvals,
127 time_t *current_time,
128 unsigned long *current_time_usec,
129 int version);
131 static int update_pdp_prep(
132 rrd_t *rrd,
133 char **updvals,
134 rrd_value_t *pdp_new,
135 double interval);
137 static int calculate_elapsed_steps(
138 rrd_t *rrd,
139 unsigned long current_time,
140 unsigned long current_time_usec,
141 double interval,
142 double *pre_int,
143 double *post_int,
144 unsigned long *proc_pdp_cnt);
146 static void simple_update(
147 rrd_t *rrd,
148 double interval,
149 rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152 rrd_t *rrd,
153 double interval,
154 double pre_int,
155 double post_int,
156 unsigned long elapsed_pdp_st,
157 rrd_value_t *pdp_new,
158 rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161 rrd_t *rrd,
162 unsigned long ds_idx,
163 double interval,
164 double pre_int,
165 double post_int,
166 long diff_pdp_st,
167 rrd_value_t *pdp_new,
168 rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171 rrd_t *rrd,
172 unsigned long *rra_step_cnt,
173 unsigned long rra_begin,
174 rrd_file_t *rrd_file,
175 unsigned long elapsed_pdp_st,
176 unsigned long proc_pdp_cnt,
177 rrd_value_t **last_seasonal_coef,
178 rrd_value_t **seasonal_coef,
179 rrd_value_t *pdp_temp,
180 unsigned long *skip_update,
181 int *schedule_smooth);
183 static int do_schedule_smooth(
184 rrd_t *rrd,
185 unsigned long rra_idx,
186 unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189 rrd_t *rrd,
190 unsigned long elapsed_pdp_st,
191 unsigned long start_pdp_offset,
192 unsigned long *rra_step_cnt,
193 int rra_idx,
194 rrd_value_t *pdp_temp,
195 rrd_value_t *last_seasonal_coef,
196 rrd_value_t *seasonal_coef,
197 int current_cf);
199 static void update_cdp(
200 unival *scratch,
201 int current_cf,
202 rrd_value_t pdp_temp_val,
203 unsigned long rra_step_cnt,
204 unsigned long elapsed_pdp_st,
205 unsigned long start_pdp_offset,
206 unsigned long pdp_cnt,
207 rrd_value_t xff,
208 int i,
209 int ii);
211 static void initialize_cdp_val(
212 unival *scratch,
213 int current_cf,
214 rrd_value_t pdp_temp_val,
215 unsigned long elapsed_pdp_st,
216 unsigned long start_pdp_offset,
217 unsigned long pdp_cnt);
219 static void reset_cdp(
220 rrd_t *rrd,
221 unsigned long elapsed_pdp_st,
222 rrd_value_t *pdp_temp,
223 rrd_value_t *last_seasonal_coef,
224 rrd_value_t *seasonal_coef,
225 int rra_idx,
226 int ds_idx,
227 int cdp_idx,
228 enum cf_en current_cf);
230 static rrd_value_t initialize_average_carry_over(
231 rrd_value_t pdp_temp_val,
232 unsigned long elapsed_pdp_st,
233 unsigned long start_pdp_offset,
234 unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237 rrd_value_t cdp_val,
238 rrd_value_t pdp_temp_val,
239 unsigned long elapsed_pdp_st,
240 int current_cf,
241 int i,
242 int ii);
244 static int update_aberrant_cdps(
245 rrd_t *rrd,
246 rrd_file_t *rrd_file,
247 unsigned long rra_begin,
248 unsigned long elapsed_pdp_st,
249 rrd_value_t *pdp_temp,
250 rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253 rrd_t *rrd,
254 rrd_file_t *rrd_file,
255 unsigned long *rra_step_cnt,
256 unsigned long rra_begin,
257 time_t current_time,
258 unsigned long *skip_update,
259 rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262 rrd_file_t *rrd_file,
263 rrd_t *rrd,
264 unsigned long rra_idx,
265 unsigned short CDP_scratch_idx,
266 rrd_info_t ** pcdp_summary,
267 time_t rra_time);
269 static int smooth_all_rras(
270 rrd_t *rrd,
271 rrd_file_t *rrd_file,
272 unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276 rrd_t *rrd,
277 rrd_file_t *rrd_file,
278 int version);
279 #endif
281 /*
282 * normalize time as returned by gettimeofday. usec part must
283 * be always >= 0
284 */
285 static inline void normalize_time(
286 struct timeval *t)
287 {
288 if (t->tv_usec < 0) {
289 t->tv_sec--;
290 t->tv_usec += 1e6L;
291 }
292 }
294 /*
295 * Sets current_time and current_time_usec based on the current time.
296 * current_time_usec is set to 0 if the version number is 1 or 2.
297 */
298 static inline void initialize_time(
299 time_t *current_time,
300 unsigned long *current_time_usec,
301 int version)
302 {
303 struct timeval tmp_time; /* used for time conversion */
305 gettimeofday(&tmp_time, 0);
306 normalize_time(&tmp_time);
307 *current_time = tmp_time.tv_sec;
308 if (version >= 3) {
309 *current_time_usec = tmp_time.tv_usec;
310 } else {
311 *current_time_usec = 0;
312 }
313 }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318 int argc,
319 char **argv)
320 {
321 char *tmplt = NULL;
322 rrd_info_t *result = NULL;
323 rrd_infoval_t rc;
324 struct option long_options[] = {
325 {"template", required_argument, 0, 't'},
326 {0, 0, 0, 0}
327 };
329 rc.u_int = -1;
330 optind = 0;
331 opterr = 0; /* initialize getopt */
333 while (1) {
334 int option_index = 0;
335 int opt;
337 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
339 if (opt == EOF)
340 break;
342 switch (opt) {
343 case 't':
344 tmplt = optarg;
345 break;
347 case '?':
348 rrd_set_error("unknown option '%s'", argv[optind - 1]);
349 goto end_tag;
350 }
351 }
353 /* need at least 2 arguments: filename, data. */
354 if (argc - optind < 2) {
355 rrd_set_error("Not enough arguments");
356 goto end_tag;
357 }
358 rc.u_int = 0;
359 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
360 rc.u_int = _rrd_update(argv[optind], tmplt,
361 argc - optind - 1,
362 (const char **) (argv + optind + 1), result);
363 result->value.u_int = rc.u_int;
364 end_tag:
365 return result;
366 }
368 int rrd_update(
369 int argc,
370 char **argv)
371 {
372 struct option long_options[] = {
373 {"template", required_argument, 0, 't'},
374 {"daemon", required_argument, 0, 'd'},
375 {0, 0, 0, 0}
376 };
377 int option_index = 0;
378 int opt;
379 char *tmplt = NULL;
380 int rc = -1;
381 char *opt_daemon = NULL;
383 optind = 0;
384 opterr = 0; /* initialize getopt */
386 while (1) {
387 opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
389 if (opt == EOF)
390 break;
392 switch (opt) {
393 case 't':
394 tmplt = strdup(optarg);
395 break;
397 case 'd':
398 if (opt_daemon != NULL)
399 free (opt_daemon);
400 opt_daemon = strdup (optarg);
401 if (opt_daemon == NULL)
402 {
403 rrd_set_error("strdup failed.");
404 goto out;
405 }
406 break;
408 case '?':
409 rrd_set_error("unknown option '%s'", argv[optind - 1]);
410 goto out;
411 }
412 }
414 /* need at least 2 arguments: filename, data. */
415 if (argc - optind < 2) {
416 rrd_set_error("Not enough arguments");
417 goto out;
418 }
420 if ((tmplt != NULL) && (opt_daemon != NULL))
421 {
422 rrd_set_error("The caching opt_daemon cannot be used together with "
423 "templates yet.");
424 goto out;
425 }
427 if ((tmplt == NULL) && (opt_daemon == NULL))
428 {
429 char *temp;
431 temp = getenv (ENV_RRDCACHED_ADDRESS);
432 if (temp != NULL)
433 {
434 opt_daemon = strdup (temp);
435 if (opt_daemon == NULL)
436 {
437 rrd_set_error("strdup failed.");
438 goto out;
439 }
440 }
441 }
443 if (opt_daemon != NULL)
444 {
445 int status;
447 status = rrdc_connect (opt_daemon);
448 if (status != 0)
449 {
450 rrd_set_error("Unable to connect to opt_daemon: %s",
451 (status < 0)
452 ? "Internal error"
453 : rrd_strerror (status));
454 goto out;
455 }
457 status = rrdc_update (/* file = */ argv[optind],
458 /* values_num = */ argc - optind - 1,
459 /* values = */ (void *) (argv + optind + 1));
460 if (status != 0)
461 {
462 rrd_set_error("Failed sending the values to the opt_daemon: %s",
463 (status < 0)
464 ? "Internal error"
465 : rrd_strerror (status));
466 }
467 else
468 {
469 rc = 0;
470 }
472 rrdc_disconnect ();
473 goto out;
474 } /* if (opt_daemon != NULL) */
476 rc = rrd_update_r(argv[optind], tmplt,
477 argc - optind - 1, (const char **) (argv + optind + 1));
478 out:
479 if (tmplt != NULL)
480 {
481 free(tmplt);
482 tmplt = NULL;
483 }
484 if (opt_daemon != NULL)
485 {
486 free (opt_daemon);
487 opt_daemon = NULL;
488 }
489 return rc;
490 }
492 int rrd_update_r(
493 const char *filename,
494 const char *tmplt,
495 int argc,
496 const char **argv)
497 {
498 return _rrd_update(filename, tmplt, argc, argv, NULL);
499 }
501 int _rrd_update(
502 const char *filename,
503 const char *tmplt,
504 int argc,
505 const char **argv,
506 rrd_info_t * pcdp_summary)
507 {
509 int arg_i = 2;
511 unsigned long rra_begin; /* byte pointer to the rra
512 * area in the rrd file. this
513 * pointer never changes value */
514 rrd_value_t *pdp_new; /* prepare the incoming data to be added
515 * to the existing entry */
516 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
517 * to the cdp values */
519 long *tmpl_idx; /* index representing the settings
520 * transported by the tmplt index */
521 unsigned long tmpl_cnt = 2; /* time and data */
522 rrd_t rrd;
523 time_t current_time = 0;
524 unsigned long current_time_usec = 0; /* microseconds part of current time */
525 char **updvals;
526 int schedule_smooth = 0;
528 /* number of elapsed PDP steps since last update */
529 unsigned long *rra_step_cnt = NULL;
531 int version; /* rrd version */
532 rrd_file_t *rrd_file;
533 char *arg_copy; /* for processing the argv */
534 unsigned long *skip_update; /* RRAs to advance but not write */
536 /* need at least 1 arguments: data. */
537 if (argc < 1) {
538 rrd_set_error("Not enough arguments");
539 goto err_out;
540 }
542 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
543 goto err_free;
544 }
545 /* We are now at the beginning of the rra's */
546 rra_begin = rrd_file->header_len;
548 version = atoi(rrd.stat_head->version);
550 initialize_time(¤t_time, ¤t_time_usec, version);
552 /* get exclusive lock to whole file.
553 * lock gets removed when we close the file.
554 */
555 if (rrd_lock(rrd_file) != 0) {
556 rrd_set_error("could not lock RRD");
557 goto err_close;
558 }
560 if (allocate_data_structures(&rrd, &updvals,
561 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
562 &rra_step_cnt, &skip_update,
563 &pdp_new) == -1) {
564 goto err_close;
565 }
567 /* loop through the arguments. */
568 for (arg_i = 0; arg_i < argc; arg_i++) {
569 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
570 rrd_set_error("failed duplication argv entry");
571 break;
572 }
573 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
574 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
575 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
576 &pcdp_summary, version, skip_update,
577 &schedule_smooth) == -1) {
578 if (rrd_test_error()) { /* Should have error string always here */
579 char *save_error;
581 /* Prepend file name to error message */
582 if ((save_error = strdup(rrd_get_error())) != NULL) {
583 rrd_set_error("%s: %s", filename, save_error);
584 free(save_error);
585 }
586 }
587 free(arg_copy);
588 break;
589 }
590 free(arg_copy);
591 }
593 free(rra_step_cnt);
595 /* if we got here and if there is an error and if the file has not been
596 * written to, then close things up and return. */
597 if (rrd_test_error()) {
598 goto err_free_structures;
599 }
600 #ifndef HAVE_MMAP
601 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
602 goto err_free_structures;
603 }
604 #endif
606 /* calling the smoothing code here guarantees at most one smoothing
607 * operation per rrd_update call. Unfortunately, it is possible with bulk
608 * updates, or a long-delayed update for smoothing to occur off-schedule.
609 * This really isn't critical except during the burn-in cycles. */
610 if (schedule_smooth) {
611 smooth_all_rras(&rrd, rrd_file, rra_begin);
612 }
614 /* rrd_dontneed(rrd_file,&rrd); */
615 rrd_free(&rrd);
616 rrd_close(rrd_file);
618 free(pdp_new);
619 free(tmpl_idx);
620 free(pdp_temp);
621 free(skip_update);
622 free(updvals);
623 return 0;
625 err_free_structures:
626 free(pdp_new);
627 free(tmpl_idx);
628 free(pdp_temp);
629 free(skip_update);
630 free(updvals);
631 err_close:
632 rrd_close(rrd_file);
633 err_free:
634 rrd_free(&rrd);
635 err_out:
636 return -1;
637 }
639 /*
640 * get exclusive lock to whole file.
641 * lock gets removed when we close the file
642 *
643 * returns 0 on success
644 */
645 int rrd_lock(
646 rrd_file_t *file)
647 {
648 int rcstat;
650 {
651 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
652 struct _stat st;
654 if (_fstat(file->fd, &st) == 0) {
655 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
656 } else {
657 rcstat = -1;
658 }
659 #else
660 struct flock lock;
662 lock.l_type = F_WRLCK; /* exclusive write lock */
663 lock.l_len = 0; /* whole file */
664 lock.l_start = 0; /* start of file */
665 lock.l_whence = SEEK_SET; /* end of file */
667 rcstat = fcntl(file->fd, F_SETLK, &lock);
668 #endif
669 }
671 return (rcstat);
672 }
674 /*
675 * Allocate some important arrays used, and initialize the template.
676 *
677 * When it returns, either all of the structures are allocated
678 * or none of them are.
679 *
680 * Returns 0 on success, -1 on error.
681 */
682 static int allocate_data_structures(
683 rrd_t *rrd,
684 char ***updvals,
685 rrd_value_t **pdp_temp,
686 const char *tmplt,
687 long **tmpl_idx,
688 unsigned long *tmpl_cnt,
689 unsigned long **rra_step_cnt,
690 unsigned long **skip_update,
691 rrd_value_t **pdp_new)
692 {
693 unsigned i, ii;
694 if ((*updvals = (char **) malloc(sizeof(char *)
695 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
696 rrd_set_error("allocating updvals pointer array.");
697 return -1;
698 }
699 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700 * rrd->stat_head->ds_cnt)) ==
701 NULL) {
702 rrd_set_error("allocating pdp_temp.");
703 goto err_free_updvals;
704 }
705 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
706 *
707 rrd->stat_head->rra_cnt)) ==
708 NULL) {
709 rrd_set_error("allocating skip_update.");
710 goto err_free_pdp_temp;
711 }
712 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
713 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
714 rrd_set_error("allocating tmpl_idx.");
715 goto err_free_skip_update;
716 }
717 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
718 *
719 (rrd->stat_head->
720 rra_cnt))) == NULL) {
721 rrd_set_error("allocating rra_step_cnt.");
722 goto err_free_tmpl_idx;
723 }
725 /* initialize tmplt redirector */
726 /* default config example (assume DS 1 is a CDEF DS)
727 tmpl_idx[0] -> 0; (time)
728 tmpl_idx[1] -> 1; (DS 0)
729 tmpl_idx[2] -> 3; (DS 2)
730 tmpl_idx[3] -> 4; (DS 3) */
731 (*tmpl_idx)[0] = 0; /* time */
732 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
733 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
734 (*tmpl_idx)[ii++] = i;
735 }
736 *tmpl_cnt = ii;
738 if (tmplt != NULL) {
739 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
740 goto err_free_rra_step_cnt;
741 }
742 }
744 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
745 * rrd->stat_head->ds_cnt)) == NULL) {
746 rrd_set_error("allocating pdp_new.");
747 goto err_free_rra_step_cnt;
748 }
750 return 0;
752 err_free_rra_step_cnt:
753 free(*rra_step_cnt);
754 err_free_tmpl_idx:
755 free(*tmpl_idx);
756 err_free_skip_update:
757 free(*skip_update);
758 err_free_pdp_temp:
759 free(*pdp_temp);
760 err_free_updvals:
761 free(*updvals);
762 return -1;
763 }
765 /*
766 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
767 *
768 * Returns 0 on success.
769 */
770 static int parse_template(
771 rrd_t *rrd,
772 const char *tmplt,
773 unsigned long *tmpl_cnt,
774 long *tmpl_idx)
775 {
776 char *dsname, *tmplt_copy;
777 unsigned int tmpl_len, i;
778 int ret = 0;
780 *tmpl_cnt = 1; /* the first entry is the time */
782 /* we should work on a writeable copy here */
783 if ((tmplt_copy = strdup(tmplt)) == NULL) {
784 rrd_set_error("error copying tmplt '%s'", tmplt);
785 ret = -1;
786 goto out;
787 }
789 dsname = tmplt_copy;
790 tmpl_len = strlen(tmplt_copy);
791 for (i = 0; i <= tmpl_len; i++) {
792 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
793 tmplt_copy[i] = '\0';
794 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
795 rrd_set_error("tmplt contains more DS definitions than RRD");
796 ret = -1;
797 goto out_free_tmpl_copy;
798 }
799 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
800 rrd_set_error("unknown DS name '%s'", dsname);
801 ret = -1;
802 goto out_free_tmpl_copy;
803 }
804 /* go to the next entry on the tmplt_copy */
805 if (i < tmpl_len)
806 dsname = &tmplt_copy[i + 1];
807 }
808 }
809 out_free_tmpl_copy:
810 free(tmplt_copy);
811 out:
812 return ret;
813 }
815 /*
816 * Parse an update string, updates the primary data points (PDPs)
817 * and consolidated data points (CDPs), and writes changes to the RRAs.
818 *
819 * Returns 0 on success, -1 on error.
820 */
821 static int process_arg(
822 char *step_start,
823 rrd_t *rrd,
824 rrd_file_t *rrd_file,
825 unsigned long rra_begin,
826 time_t *current_time,
827 unsigned long *current_time_usec,
828 rrd_value_t *pdp_temp,
829 rrd_value_t *pdp_new,
830 unsigned long *rra_step_cnt,
831 char **updvals,
832 long *tmpl_idx,
833 unsigned long tmpl_cnt,
834 rrd_info_t ** pcdp_summary,
835 int version,
836 unsigned long *skip_update,
837 int *schedule_smooth)
838 {
839 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
841 /* a vector of future Holt-Winters seasonal coefs */
842 unsigned long elapsed_pdp_st;
844 double interval, pre_int, post_int; /* interval between this and
845 * the last run */
846 unsigned long proc_pdp_cnt;
848 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
849 current_time, current_time_usec, version) == -1) {
850 return -1;
851 }
853 interval = (double) (*current_time - rrd->live_head->last_up)
854 + (double) ((long) *current_time_usec -
855 (long) rrd->live_head->last_up_usec) / 1e6f;
857 /* process the data sources and update the pdp_prep
858 * area accordingly */
859 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
860 return -1;
861 }
863 elapsed_pdp_st = calculate_elapsed_steps(rrd,
864 *current_time,
865 *current_time_usec, interval,
866 &pre_int, &post_int,
867 &proc_pdp_cnt);
869 /* has a pdp_st moment occurred since the last run ? */
870 if (elapsed_pdp_st == 0) {
871 /* no we have not passed a pdp_st moment. therefore update is simple */
872 simple_update(rrd, interval, pdp_new);
873 } else {
874 /* an pdp_st has occurred. */
875 if (process_all_pdp_st(rrd, interval,
876 pre_int, post_int,
877 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
878 return -1;
879 }
880 if (update_all_cdp_prep(rrd, rra_step_cnt,
881 rra_begin, rrd_file,
882 elapsed_pdp_st,
883 proc_pdp_cnt,
884 &last_seasonal_coef,
885 &seasonal_coef,
886 pdp_temp,
887 skip_update, schedule_smooth) == -1) {
888 goto err_free_coefficients;
889 }
890 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
891 elapsed_pdp_st, pdp_temp,
892 &seasonal_coef) == -1) {
893 goto err_free_coefficients;
894 }
895 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
896 *current_time, skip_update,
897 pcdp_summary) == -1) {
898 goto err_free_coefficients;
899 }
900 } /* endif a pdp_st has occurred */
901 rrd->live_head->last_up = *current_time;
902 rrd->live_head->last_up_usec = *current_time_usec;
904 if (version < 3) {
905 *rrd->legacy_last_up = rrd->live_head->last_up;
906 }
907 free(seasonal_coef);
908 free(last_seasonal_coef);
909 return 0;
911 err_free_coefficients:
912 free(seasonal_coef);
913 free(last_seasonal_coef);
914 return -1;
915 }
917 /*
918 * Parse a DS string (time + colon-separated values), storing the
919 * results in current_time, current_time_usec, and updvals.
920 *
921 * Returns 0 on success, -1 on error.
922 */
923 static int parse_ds(
924 rrd_t *rrd,
925 char **updvals,
926 long *tmpl_idx,
927 char *input,
928 unsigned long tmpl_cnt,
929 time_t *current_time,
930 unsigned long *current_time_usec,
931 int version)
932 {
933 char *p;
934 unsigned long i;
935 char timesyntax;
937 updvals[0] = input;
938 /* initialize all ds input to unknown except the first one
939 which has always got to be set */
940 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
941 updvals[i] = "U";
943 /* separate all ds elements; first must be examined separately
944 due to alternate time syntax */
945 if ((p = strchr(input, '@')) != NULL) {
946 timesyntax = '@';
947 } else if ((p = strchr(input, ':')) != NULL) {
948 timesyntax = ':';
949 } else {
950 rrd_set_error("expected timestamp not found in data source from %s",
951 input);
952 return -1;
953 }
954 *p = '\0';
955 i = 1;
956 updvals[tmpl_idx[i++]] = p + 1;
957 while (*(++p)) {
958 if (*p == ':') {
959 *p = '\0';
960 if (i < tmpl_cnt) {
961 updvals[tmpl_idx[i++]] = p + 1;
962 }
963 }
964 }
966 if (i != tmpl_cnt) {
967 rrd_set_error("expected %lu data source readings (got %lu) from %s",
968 tmpl_cnt - 1, i, input);
969 return -1;
970 }
972 if (get_time_from_reading(rrd, timesyntax, updvals,
973 current_time, current_time_usec,
974 version) == -1) {
975 return -1;
976 }
977 return 0;
978 }
980 /*
981 * Parse the time in a DS string, store it in current_time and
982 * current_time_usec and verify that it's later than the last
983 * update for this DS.
984 *
985 * Returns 0 on success, -1 on error.
986 */
987 static int get_time_from_reading(
988 rrd_t *rrd,
989 char timesyntax,
990 char **updvals,
991 time_t *current_time,
992 unsigned long *current_time_usec,
993 int version)
994 {
995 double tmp;
996 char *parsetime_error = NULL;
997 char *old_locale;
998 rrd_time_value_t ds_tv;
999 struct timeval tmp_time; /* used for time conversion */
1001 /* get the time from the reading ... handle N */
1002 if (timesyntax == '@') { /* at-style */
1003 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1004 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1005 return -1;
1006 }
1007 if (ds_tv.type == RELATIVE_TO_END_TIME ||
1008 ds_tv.type == RELATIVE_TO_START_TIME) {
1009 rrd_set_error("specifying time relative to the 'start' "
1010 "or 'end' makes no sense here: %s", updvals[0]);
1011 return -1;
1012 }
1013 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1014 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1015 } else if (strcmp(updvals[0], "N") == 0) {
1016 gettimeofday(&tmp_time, 0);
1017 normalize_time(&tmp_time);
1018 *current_time = tmp_time.tv_sec;
1019 *current_time_usec = tmp_time.tv_usec;
1020 } else {
1021 old_locale = setlocale(LC_NUMERIC, "C");
1022 tmp = strtod(updvals[0], 0);
1023 setlocale(LC_NUMERIC, old_locale);
1024 *current_time = floor(tmp);
1025 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1026 }
1027 /* dont do any correction for old version RRDs */
1028 if (version < 3)
1029 *current_time_usec = 0;
1031 if (*current_time < rrd->live_head->last_up ||
1032 (*current_time == rrd->live_head->last_up &&
1033 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1034 rrd_set_error("illegal attempt to update using time %ld when "
1035 "last update time is %ld (minimum one second step)",
1036 *current_time, rrd->live_head->last_up);
1037 return -1;
1038 }
1039 return 0;
1040 }
1042 /*
1043 * Update pdp_new by interpreting the updvals according to the DS type
1044 * (COUNTER, GAUGE, etc.).
1045 *
1046 * Returns 0 on success, -1 on error.
1047 */
1048 static int update_pdp_prep(
1049 rrd_t *rrd,
1050 char **updvals,
1051 rrd_value_t *pdp_new,
1052 double interval)
1053 {
1054 unsigned long ds_idx;
1055 int ii;
1056 char *endptr; /* used in the conversion */
1057 double rate;
1058 char *old_locale;
1059 enum dst_en dst_idx;
1061 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1062 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1064 /* make sure we do not build diffs with old last_ds values */
1065 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1066 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1067 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1068 }
1070 /* NOTE: DST_CDEF should never enter this if block, because
1071 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1072 * accidently specified a value for the DST_CDEF. To handle this case,
1073 * an extra check is required. */
1075 if ((updvals[ds_idx + 1][0] != 'U') &&
1076 (dst_idx != DST_CDEF) &&
1077 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1078 rate = DNAN;
1080 /* pdp_new contains rate * time ... eg the bytes transferred during
1081 * the interval. Doing it this way saves a lot of math operations
1082 */
1083 switch (dst_idx) {
1084 case DST_COUNTER:
1085 case DST_DERIVE:
1086 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1087 if ((updvals[ds_idx + 1][ii] < '0'
1088 || updvals[ds_idx + 1][ii] > '9')
1089 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1090 rrd_set_error("not a simple integer: '%s'",
1091 updvals[ds_idx + 1]);
1092 return -1;
1093 }
1094 }
1095 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1096 pdp_new[ds_idx] =
1097 rrd_diff(updvals[ds_idx + 1],
1098 rrd->pdp_prep[ds_idx].last_ds);
1099 if (dst_idx == DST_COUNTER) {
1100 /* simple overflow catcher. This will fail
1101 * terribly for non 32 or 64 bit counters
1102 * ... are there any others in SNMP land?
1103 */
1104 if (pdp_new[ds_idx] < (double) 0.0)
1105 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1106 if (pdp_new[ds_idx] < (double) 0.0)
1107 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1108 }
1109 rate = pdp_new[ds_idx] / interval;
1110 } else {
1111 pdp_new[ds_idx] = DNAN;
1112 }
1113 break;
1114 case DST_ABSOLUTE:
1115 old_locale = setlocale(LC_NUMERIC, "C");
1116 errno = 0;
1117 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1118 setlocale(LC_NUMERIC, old_locale);
1119 if (errno > 0) {
1120 rrd_set_error("converting '%s' to float: %s",
1121 updvals[ds_idx + 1], rrd_strerror(errno));
1122 return -1;
1123 };
1124 if (endptr[0] != '\0') {
1125 rrd_set_error
1126 ("conversion of '%s' to float not complete: tail '%s'",
1127 updvals[ds_idx + 1], endptr);
1128 return -1;
1129 }
1130 rate = pdp_new[ds_idx] / interval;
1131 break;
1132 case DST_GAUGE:
1133 errno = 0;
1134 old_locale = setlocale(LC_NUMERIC, "C");
1135 pdp_new[ds_idx] =
1136 strtod(updvals[ds_idx + 1], &endptr) * interval;
1137 setlocale(LC_NUMERIC, old_locale);
1138 if (errno) {
1139 rrd_set_error("converting '%s' to float: %s",
1140 updvals[ds_idx + 1], rrd_strerror(errno));
1141 return -1;
1142 };
1143 if (endptr[0] != '\0') {
1144 rrd_set_error
1145 ("conversion of '%s' to float not complete: tail '%s'",
1146 updvals[ds_idx + 1], endptr);
1147 return -1;
1148 }
1149 rate = pdp_new[ds_idx] / interval;
1150 break;
1151 default:
1152 rrd_set_error("rrd contains unknown DS type : '%s'",
1153 rrd->ds_def[ds_idx].dst);
1154 return -1;
1155 }
1156 /* break out of this for loop if the error string is set */
1157 if (rrd_test_error()) {
1158 return -1;
1159 }
1160 /* make sure pdp_temp is neither too large or too small
1161 * if any of these occur it becomes unknown ...
1162 * sorry folks ... */
1163 if (!isnan(rate) &&
1164 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1165 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1166 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1167 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1168 pdp_new[ds_idx] = DNAN;
1169 }
1170 } else {
1171 /* no news is news all the same */
1172 pdp_new[ds_idx] = DNAN;
1173 }
1176 /* make a copy of the command line argument for the next run */
1177 #ifdef DEBUG
1178 fprintf(stderr, "prep ds[%lu]\t"
1179 "last_arg '%s'\t"
1180 "this_arg '%s'\t"
1181 "pdp_new %10.2f\n",
1182 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1183 pdp_new[ds_idx]);
1184 #endif
1185 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1186 LAST_DS_LEN - 1);
1187 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1188 }
1189 return 0;
1190 }
1192 /*
1193 * How many PDP steps have elapsed since the last update? Returns the answer,
1194 * and stores the time between the last update and the last PDP in pre_time,
1195 * and the time between the last PDP and the current time in post_int.
1196 */
1197 static int calculate_elapsed_steps(
1198 rrd_t *rrd,
1199 unsigned long current_time,
1200 unsigned long current_time_usec,
1201 double interval,
1202 double *pre_int,
1203 double *post_int,
1204 unsigned long *proc_pdp_cnt)
1205 {
1206 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1207 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1208 * time */
1209 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1210 * when it was last updated */
1211 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1213 /* when was the current pdp started */
1214 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1215 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1217 /* when did the last pdp_st occur */
1218 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1219 occu_pdp_st = current_time - occu_pdp_age;
1221 if (occu_pdp_st > proc_pdp_st) {
1222 /* OK we passed the pdp_st moment */
1223 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1224 * occurred before the latest
1225 * pdp_st moment*/
1226 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1227 *post_int = occu_pdp_age; /* how much after it */
1228 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1229 } else {
1230 *pre_int = interval;
1231 *post_int = 0;
1232 }
1234 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1236 #ifdef DEBUG
1237 printf("proc_pdp_age %lu\t"
1238 "proc_pdp_st %lu\t"
1239 "occu_pfp_age %lu\t"
1240 "occu_pdp_st %lu\t"
1241 "int %lf\t"
1242 "pre_int %lf\t"
1243 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1244 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1245 #endif
1247 /* compute the number of elapsed pdp_st moments */
1248 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1249 }
1251 /*
1252 * Increment the PDP values by the values in pdp_new, or else initialize them.
1253 */
1254 static void simple_update(
1255 rrd_t *rrd,
1256 double interval,
1257 rrd_value_t *pdp_new)
1258 {
1259 int i;
1261 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1262 if (isnan(pdp_new[i])) {
1263 /* this is not really accurate if we use subsecond data arrival time
1264 should have thought of it when going subsecond resolution ...
1265 sorry next format change we will have it! */
1266 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1267 floor(interval);
1268 } else {
1269 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1270 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1271 } else {
1272 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1273 }
1274 }
1275 #ifdef DEBUG
1276 fprintf(stderr,
1277 "NO PDP ds[%i]\t"
1278 "value %10.2f\t"
1279 "unkn_sec %5lu\n",
1280 i,
1281 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1282 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1283 #endif
1284 }
1285 }
1287 /*
1288 * Call process_pdp_st for each DS.
1289 *
1290 * Returns 0 on success, -1 on error.
1291 */
1292 static int process_all_pdp_st(
1293 rrd_t *rrd,
1294 double interval,
1295 double pre_int,
1296 double post_int,
1297 unsigned long elapsed_pdp_st,
1298 rrd_value_t *pdp_new,
1299 rrd_value_t *pdp_temp)
1300 {
1301 unsigned long ds_idx;
1303 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1304 rate*seconds which occurred up to the last run.
1305 pdp_new[] contains rate*seconds from the latest run.
1306 pdp_temp[] will contain the rate for cdp */
1308 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1309 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1310 elapsed_pdp_st * rrd->stat_head->pdp_step,
1311 pdp_new, pdp_temp) == -1) {
1312 return -1;
1313 }
1314 #ifdef DEBUG
1315 fprintf(stderr, "PDP UPD ds[%lu]\t"
1316 "elapsed_pdp_st %lu\t"
1317 "pdp_temp %10.2f\t"
1318 "new_prep %10.2f\t"
1319 "new_unkn_sec %5lu\n",
1320 ds_idx,
1321 elapsed_pdp_st,
1322 pdp_temp[ds_idx],
1323 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1324 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1325 #endif
1326 }
1327 return 0;
1328 }
1330 /*
1331 * Process an update that occurs after one of the PDP moments.
1332 * Increments the PDP value, sets NAN if time greater than the
1333 * heartbeats have elapsed, processes CDEFs.
1334 *
1335 * Returns 0 on success, -1 on error.
1336 */
1337 static int process_pdp_st(
1338 rrd_t *rrd,
1339 unsigned long ds_idx,
1340 double interval,
1341 double pre_int,
1342 double post_int,
1343 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1344 rrd_value_t *pdp_new,
1345 rrd_value_t *pdp_temp)
1346 {
1347 int i;
1349 /* update pdp_prep to the current pdp_st. */
1350 double pre_unknown = 0.0;
1351 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1352 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1354 rpnstack_t rpnstack; /* used for COMPUTE DS */
1356 rpnstack_init(&rpnstack);
1359 if (isnan(pdp_new[ds_idx])) {
1360 /* a final bit of unknown to be added before calculation
1361 we use a temporary variable for this so that we
1362 don't have to turn integer lines before using the value */
1363 pre_unknown = pre_int;
1364 } else {
1365 if (isnan(scratch[PDP_val].u_val)) {
1366 scratch[PDP_val].u_val = 0;
1367 }
1368 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1369 }
1371 /* if too much of the pdp_prep is unknown we dump it */
1372 /* if the interval is larger thatn mrhb we get NAN */
1373 if ((interval > mrhb) ||
1374 (rrd->stat_head->pdp_step / 2.0 <
1375 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1376 pdp_temp[ds_idx] = DNAN;
1377 } else {
1378 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1379 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1380 pre_unknown);
1381 }
1383 /* process CDEF data sources; remember each CDEF DS can
1384 * only reference other DS with a lower index number */
1385 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1386 rpnp_t *rpnp;
1388 rpnp =
1389 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1390 /* substitute data values for OP_VARIABLE nodes */
1391 for (i = 0; rpnp[i].op != OP_END; i++) {
1392 if (rpnp[i].op == OP_VARIABLE) {
1393 rpnp[i].op = OP_NUMBER;
1394 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1395 }
1396 }
1397 /* run the rpn calculator */
1398 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1399 free(rpnp);
1400 rpnstack_free(&rpnstack);
1401 return -1;
1402 }
1403 }
1405 /* make pdp_prep ready for the next run */
1406 if (isnan(pdp_new[ds_idx])) {
1407 /* this is not realy accurate if we use subsecond data arival time
1408 should have thought of it when going subsecond resolution ...
1409 sorry next format change we will have it! */
1410 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1411 scratch[PDP_val].u_val = DNAN;
1412 } else {
1413 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1414 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1415 }
1416 rpnstack_free(&rpnstack);
1417 return 0;
1418 }
1420 /*
1421 * Iterate over all the RRAs for a given DS and:
1422 * 1. Decide whether to schedule a smooth later
1423 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1424 * 3. Update the CDP
1425 *
1426 * Returns 0 on success, -1 on error
1427 */
1428 static int update_all_cdp_prep(
1429 rrd_t *rrd,
1430 unsigned long *rra_step_cnt,
1431 unsigned long rra_begin,
1432 rrd_file_t *rrd_file,
1433 unsigned long elapsed_pdp_st,
1434 unsigned long proc_pdp_cnt,
1435 rrd_value_t **last_seasonal_coef,
1436 rrd_value_t **seasonal_coef,
1437 rrd_value_t *pdp_temp,
1438 unsigned long *skip_update,
1439 int *schedule_smooth)
1440 {
1441 unsigned long rra_idx;
1443 /* index into the CDP scratch array */
1444 enum cf_en current_cf;
1445 unsigned long rra_start;
1447 /* number of rows to be updated in an RRA for a data value. */
1448 unsigned long start_pdp_offset;
1450 rra_start = rra_begin;
1451 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1452 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1453 start_pdp_offset =
1454 rrd->rra_def[rra_idx].pdp_cnt -
1455 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1456 skip_update[rra_idx] = 0;
1457 if (start_pdp_offset <= elapsed_pdp_st) {
1458 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1459 rrd->rra_def[rra_idx].pdp_cnt + 1;
1460 } else {
1461 rra_step_cnt[rra_idx] = 0;
1462 }
1464 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1465 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1466 * so that they will be correct for the next observed value; note that for
1467 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1468 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1469 if (rra_step_cnt[rra_idx] > 1) {
1470 skip_update[rra_idx] = 1;
1471 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1472 elapsed_pdp_st, last_seasonal_coef);
1473 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1474 elapsed_pdp_st + 1, seasonal_coef);
1475 }
1476 /* periodically run a smoother for seasonal effects */
1477 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1478 #ifdef DEBUG
1479 fprintf(stderr,
1480 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1481 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1482 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1483 u_cnt);
1484 #endif
1485 *schedule_smooth = 1;
1486 }
1487 }
1488 if (rrd_test_error())
1489 return -1;
1491 if (update_cdp_prep
1492 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1493 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1494 current_cf) == -1) {
1495 return -1;
1496 }
1497 rra_start +=
1498 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1499 sizeof(rrd_value_t);
1500 }
1501 return 0;
1502 }
1504 /*
1505 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1506 */
1507 static int do_schedule_smooth(
1508 rrd_t *rrd,
1509 unsigned long rra_idx,
1510 unsigned long elapsed_pdp_st)
1511 {
1512 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1513 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1514 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1515 unsigned long seasonal_smooth_idx =
1516 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1517 unsigned long *init_seasonal =
1518 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1520 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1521 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1522 * really an RRA level, not a data source within RRA level parameter, but
1523 * the rra_def is read only for rrd_update (not flushed to disk). */
1524 if (*init_seasonal > BURNIN_CYCLES) {
1525 /* someone has no doubt invented a trick to deal with this wrap around,
1526 * but at least this code is clear. */
1527 if (seasonal_smooth_idx > cur_row) {
1528 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1529 * between PDP and CDP */
1530 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1531 }
1532 /* can't rely on negative numbers because we are working with
1533 * unsigned values */
1534 return (cur_row + elapsed_pdp_st >= row_cnt
1535 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1536 }
1537 /* mark off one of the burn-in cycles */
1538 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1539 }
1541 /*
1542 * For a given RRA, iterate over the data sources and call the appropriate
1543 * consolidation function.
1544 *
1545 * Returns 0 on success, -1 on error.
1546 */
1547 static int update_cdp_prep(
1548 rrd_t *rrd,
1549 unsigned long elapsed_pdp_st,
1550 unsigned long start_pdp_offset,
1551 unsigned long *rra_step_cnt,
1552 int rra_idx,
1553 rrd_value_t *pdp_temp,
1554 rrd_value_t *last_seasonal_coef,
1555 rrd_value_t *seasonal_coef,
1556 int current_cf)
1557 {
1558 unsigned long ds_idx, cdp_idx;
1560 /* update CDP_PREP areas */
1561 /* loop over data soures within each RRA */
1562 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1564 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1566 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1567 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1568 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1569 elapsed_pdp_st, start_pdp_offset,
1570 rrd->rra_def[rra_idx].pdp_cnt,
1571 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1572 rra_idx, ds_idx);
1573 } else {
1574 /* Nothing to consolidate if there's one PDP per CDP. However, if
1575 * we've missed some PDPs, let's update null counters etc. */
1576 if (elapsed_pdp_st > 2) {
1577 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1578 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1579 current_cf);
1580 }
1581 }
1583 if (rrd_test_error())
1584 return -1;
1585 } /* endif data sources loop */
1586 return 0;
1587 }
1589 /*
1590 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1591 * primary value, secondary value, and # of unknowns.
1592 */
1593 static void update_cdp(
1594 unival *scratch,
1595 int current_cf,
1596 rrd_value_t pdp_temp_val,
1597 unsigned long rra_step_cnt,
1598 unsigned long elapsed_pdp_st,
1599 unsigned long start_pdp_offset,
1600 unsigned long pdp_cnt,
1601 rrd_value_t xff,
1602 int i,
1603 int ii)
1604 {
1605 /* shorthand variables */
1606 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1607 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1608 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1609 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1611 if (rra_step_cnt) {
1612 /* If we are in this block, as least 1 CDP value will be written to
1613 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1614 * to be written, then the "fill in" value is the CDP_secondary_val
1615 * entry. */
1616 if (isnan(pdp_temp_val)) {
1617 *cdp_unkn_pdp_cnt += start_pdp_offset;
1618 *cdp_secondary_val = DNAN;
1619 } else {
1620 /* CDP_secondary value is the RRA "fill in" value for intermediary
1621 * CDP data entries. No matter the CF, the value is the same because
1622 * the average, max, min, and last of a list of identical values is
1623 * the same, namely, the value itself. */
1624 *cdp_secondary_val = pdp_temp_val;
1625 }
1627 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1628 *cdp_primary_val = DNAN;
1629 if (current_cf == CF_AVERAGE) {
1630 *cdp_val =
1631 initialize_average_carry_over(pdp_temp_val,
1632 elapsed_pdp_st,
1633 start_pdp_offset, pdp_cnt);
1634 } else {
1635 *cdp_val = pdp_temp_val;
1636 }
1637 } else {
1638 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1639 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1640 } /* endif meets xff value requirement for a valid value */
1641 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1642 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1643 if (isnan(pdp_temp_val))
1644 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1645 else
1646 *cdp_unkn_pdp_cnt = 0;
1647 } else { /* rra_step_cnt[i] == 0 */
1649 #ifdef DEBUG
1650 if (isnan(*cdp_val)) {
1651 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1652 i, ii);
1653 } else {
1654 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1655 i, ii, *cdp_val);
1656 }
1657 #endif
1658 if (isnan(pdp_temp_val)) {
1659 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1660 } else {
1661 *cdp_val =
1662 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1663 current_cf, i, ii);
1664 }
1665 }
1666 }
1668 /*
1669 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1670 * on the type of consolidation function.
1671 */
1672 static void initialize_cdp_val(
1673 unival *scratch,
1674 int current_cf,
1675 rrd_value_t pdp_temp_val,
1676 unsigned long elapsed_pdp_st,
1677 unsigned long start_pdp_offset,
1678 unsigned long pdp_cnt)
1679 {
1680 rrd_value_t cum_val, cur_val;
1682 switch (current_cf) {
1683 case CF_AVERAGE:
1684 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1685 cur_val = IFDNAN(pdp_temp_val, 0.0);
1686 scratch[CDP_primary_val].u_val =
1687 (cum_val + cur_val * start_pdp_offset) /
1688 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1689 scratch[CDP_val].u_val =
1690 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1691 start_pdp_offset, pdp_cnt);
1692 break;
1693 case CF_MAXIMUM:
1694 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1695 cur_val = IFDNAN(pdp_temp_val, -DINF);
1696 #if 0
1697 #ifdef DEBUG
1698 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1699 fprintf(stderr,
1700 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1701 i, ii);
1702 exit(-1);
1703 }
1704 #endif
1705 #endif
1706 if (cur_val > cum_val)
1707 scratch[CDP_primary_val].u_val = cur_val;
1708 else
1709 scratch[CDP_primary_val].u_val = cum_val;
1710 /* initialize carry over value */
1711 scratch[CDP_val].u_val = pdp_temp_val;
1712 break;
1713 case CF_MINIMUM:
1714 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1715 cur_val = IFDNAN(pdp_temp_val, DINF);
1716 #if 0
1717 #ifdef DEBUG
1718 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1719 fprintf(stderr,
1720 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1721 ii);
1722 exit(-1);
1723 }
1724 #endif
1725 #endif
1726 if (cur_val < cum_val)
1727 scratch[CDP_primary_val].u_val = cur_val;
1728 else
1729 scratch[CDP_primary_val].u_val = cum_val;
1730 /* initialize carry over value */
1731 scratch[CDP_val].u_val = pdp_temp_val;
1732 break;
1733 case CF_LAST:
1734 default:
1735 scratch[CDP_primary_val].u_val = pdp_temp_val;
1736 /* initialize carry over value */
1737 scratch[CDP_val].u_val = pdp_temp_val;
1738 break;
1739 }
1740 }
1742 /*
1743 * Update the consolidation function for Holt-Winters functions as
1744 * well as other functions that don't actually consolidate multiple
1745 * PDPs.
1746 */
1747 static void reset_cdp(
1748 rrd_t *rrd,
1749 unsigned long elapsed_pdp_st,
1750 rrd_value_t *pdp_temp,
1751 rrd_value_t *last_seasonal_coef,
1752 rrd_value_t *seasonal_coef,
1753 int rra_idx,
1754 int ds_idx,
1755 int cdp_idx,
1756 enum cf_en current_cf)
1757 {
1758 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1760 switch (current_cf) {
1761 case CF_AVERAGE:
1762 default:
1763 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1764 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1765 break;
1766 case CF_SEASONAL:
1767 case CF_DEVSEASONAL:
1768 /* need to update cached seasonal values, so they are consistent
1769 * with the bulk update */
1770 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1771 * CDP_last_deviation are the same. */
1772 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1773 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1774 break;
1775 case CF_HWPREDICT:
1776 case CF_MHWPREDICT:
1777 /* need to update the null_count and last_null_count.
1778 * even do this for non-DNAN pdp_temp because the
1779 * algorithm is not learning from batch updates. */
1780 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1781 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1782 /* fall through */
1783 case CF_DEVPREDICT:
1784 scratch[CDP_primary_val].u_val = DNAN;
1785 scratch[CDP_secondary_val].u_val = DNAN;
1786 break;
1787 case CF_FAILURES:
1788 /* do not count missed bulk values as failures */
1789 scratch[CDP_primary_val].u_val = 0;
1790 scratch[CDP_secondary_val].u_val = 0;
1791 /* need to reset violations buffer.
1792 * could do this more carefully, but for now, just
1793 * assume a bulk update wipes away all violations. */
1794 erase_violations(rrd, cdp_idx, rra_idx);
1795 break;
1796 }
1797 }
1799 static rrd_value_t initialize_average_carry_over(
1800 rrd_value_t pdp_temp_val,
1801 unsigned long elapsed_pdp_st,
1802 unsigned long start_pdp_offset,
1803 unsigned long pdp_cnt)
1804 {
1805 /* initialize carry over value */
1806 if (isnan(pdp_temp_val)) {
1807 return DNAN;
1808 }
1809 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1810 }
1812 /*
1813 * Update or initialize a CDP value based on the consolidation
1814 * function.
1815 *
1816 * Returns the new value.
1817 */
1818 static rrd_value_t calculate_cdp_val(
1819 rrd_value_t cdp_val,
1820 rrd_value_t pdp_temp_val,
1821 unsigned long elapsed_pdp_st,
1822 int current_cf,
1823 #ifdef DEBUG
1824 int i,
1825 int ii
1826 #else
1827 int UNUSED(i),
1828 int UNUSED(ii)
1829 #endif
1830 )
1831 {
1832 if (isnan(cdp_val)) {
1833 if (current_cf == CF_AVERAGE) {
1834 pdp_temp_val *= elapsed_pdp_st;
1835 }
1836 #ifdef DEBUG
1837 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1838 i, ii, pdp_temp_val);
1839 #endif
1840 return pdp_temp_val;
1841 }
1842 if (current_cf == CF_AVERAGE)
1843 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1844 if (current_cf == CF_MINIMUM)
1845 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1846 if (current_cf == CF_MAXIMUM)
1847 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1849 return pdp_temp_val;
1850 }
1852 /*
1853 * For each RRA, update the seasonal values and then call update_aberrant_CF
1854 * for each data source.
1855 *
1856 * Return 0 on success, -1 on error.
1857 */
1858 static int update_aberrant_cdps(
1859 rrd_t *rrd,
1860 rrd_file_t *rrd_file,
1861 unsigned long rra_begin,
1862 unsigned long elapsed_pdp_st,
1863 rrd_value_t *pdp_temp,
1864 rrd_value_t **seasonal_coef)
1865 {
1866 unsigned long rra_idx, ds_idx, j;
1868 /* number of PDP steps since the last update that
1869 * are assigned to the first CDP to be generated
1870 * since the last update. */
1871 unsigned short scratch_idx;
1872 unsigned long rra_start;
1873 enum cf_en current_cf;
1875 /* this loop is only entered if elapsed_pdp_st < 3 */
1876 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1877 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1878 rra_start = rra_begin;
1879 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1880 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1881 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1882 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1883 if (scratch_idx == CDP_primary_val) {
1884 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1885 elapsed_pdp_st + 1, seasonal_coef);
1886 } else {
1887 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1888 elapsed_pdp_st + 2, seasonal_coef);
1889 }
1890 }
1891 if (rrd_test_error())
1892 return -1;
1893 /* loop over data soures within each RRA */
1894 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1895 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1896 rra_idx * (rrd->stat_head->ds_cnt) +
1897 ds_idx, rra_idx, ds_idx, scratch_idx,
1898 *seasonal_coef);
1899 }
1900 }
1901 rra_start += rrd->rra_def[rra_idx].row_cnt
1902 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1903 }
1904 }
1905 return 0;
1906 }
1908 /*
1909 * Move sequentially through the file, writing one RRA at a time. Note this
1910 * architecture divorces the computation of CDP with flushing updated RRA
1911 * entries to disk.
1912 *
1913 * Return 0 on success, -1 on error.
1914 */
1915 static int write_to_rras(
1916 rrd_t *rrd,
1917 rrd_file_t *rrd_file,
1918 unsigned long *rra_step_cnt,
1919 unsigned long rra_begin,
1920 time_t current_time,
1921 unsigned long *skip_update,
1922 rrd_info_t ** pcdp_summary)
1923 {
1924 unsigned long rra_idx;
1925 unsigned long rra_start;
1926 time_t rra_time = 0; /* time of update for a RRA */
1928 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1930 /* Ready to write to disk */
1931 rra_start = rra_begin;
1933 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1934 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1935 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1937 /* for cdp_prep */
1938 unsigned short scratch_idx;
1939 unsigned long step_subtract;
1941 for (scratch_idx = CDP_primary_val,
1942 step_subtract = 1;
1943 rra_step_cnt[rra_idx] > 0;
1944 rra_step_cnt[rra_idx]--,
1945 scratch_idx = CDP_secondary_val,
1946 step_subtract = 2) {
1948 off_t rra_pos_new;
1949 #ifdef DEBUG
1950 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1951 #endif
1952 /* increment, with wrap-around */
1953 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1954 rra_ptr->cur_row = 0;
1956 /* we know what our position should be */
1957 rra_pos_new = rra_start
1958 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1960 /* re-seek if the position is wrong or we wrapped around */
1961 if (rra_pos_new != rrd_file->pos) {
1962 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1963 rrd_set_error("seek error in rrd");
1964 return -1;
1965 }
1966 }
1967 #ifdef DEBUG
1968 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1969 #endif
1971 if (skip_update[rra_idx])
1972 continue;
1974 if (*pcdp_summary != NULL) {
1975 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1977 rra_time = (current_time - current_time % step_time)
1978 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1979 }
1981 if (write_RRA_row
1982 (rrd_file, rrd, rra_idx, scratch_idx,
1983 pcdp_summary, rra_time) == -1)
1984 return -1;
1985 }
1987 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1988 } /* RRA LOOP */
1990 return 0;
1991 }
1993 /*
1994 * Write out one row of values (one value per DS) to the archive.
1995 *
1996 * Returns 0 on success, -1 on error.
1997 */
1998 static int write_RRA_row(
1999 rrd_file_t *rrd_file,
2000 rrd_t *rrd,
2001 unsigned long rra_idx,
2002 unsigned short CDP_scratch_idx,
2003 rrd_info_t ** pcdp_summary,
2004 time_t rra_time)
2005 {
2006 unsigned long ds_idx, cdp_idx;
2007 rrd_infoval_t iv;
2009 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2010 /* compute the cdp index */
2011 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2012 #ifdef DEBUG
2013 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2014 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2015 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2016 #endif
2017 if (*pcdp_summary != NULL) {
2018 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2019 /* append info to the return hash */
2020 *pcdp_summary = rrd_info_push(*pcdp_summary,
2021 sprintf_alloc
2022 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2023 rrd->rra_def[rra_idx].cf_nam,
2024 rrd->rra_def[rra_idx].pdp_cnt,
2025 rrd->ds_def[ds_idx].ds_nam),
2026 RD_I_VAL, iv);
2027 }
2028 if (rrd_write(rrd_file,
2029 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2030 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2031 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2032 return -1;
2033 }
2034 }
2035 return 0;
2036 }
2038 /*
2039 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2040 *
2041 * Returns 0 on success, -1 otherwise
2042 */
2043 static int smooth_all_rras(
2044 rrd_t *rrd,
2045 rrd_file_t *rrd_file,
2046 unsigned long rra_begin)
2047 {
2048 unsigned long rra_start = rra_begin;
2049 unsigned long rra_idx;
2051 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2052 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2053 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2054 #ifdef DEBUG
2055 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2056 #endif
2057 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2058 if (rrd_test_error())
2059 return -1;
2060 }
2061 rra_start += rrd->rra_def[rra_idx].row_cnt
2062 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2063 }
2064 return 0;
2065 }
2067 #ifndef HAVE_MMAP
2068 /*
2069 * Flush changes to disk (unless we're using mmap)
2070 *
2071 * Returns 0 on success, -1 otherwise
2072 */
2073 static int write_changes_to_disk(
2074 rrd_t *rrd,
2075 rrd_file_t *rrd_file,
2076 int version)
2077 {
2078 /* we just need to write back the live header portion now */
2079 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2080 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2081 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2082 SEEK_SET) != 0) {
2083 rrd_set_error("seek rrd for live header writeback");
2084 return -1;
2085 }
2086 if (version >= 3) {
2087 if (rrd_write(rrd_file, rrd->live_head,
2088 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2089 rrd_set_error("rrd_write live_head to rrd");
2090 return -1;
2091 }
2092 } else {
2093 if (rrd_write(rrd_file, rrd->legacy_last_up,
2094 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2095 rrd_set_error("rrd_write live_head to rrd");
2096 return -1;
2097 }
2098 }
2101 if (rrd_write(rrd_file, rrd->pdp_prep,
2102 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2103 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2104 rrd_set_error("rrd_write pdp_prep to rrd");
2105 return -1;
2106 }
2108 if (rrd_write(rrd_file, rrd->cdp_prep,
2109 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2110 rrd->stat_head->ds_cnt)
2111 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2112 rrd->stat_head->ds_cnt)) {
2114 rrd_set_error("rrd_write cdp_prep to rrd");
2115 return -1;
2116 }
2118 if (rrd_write(rrd_file, rrd->rra_ptr,
2119 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2120 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2121 rrd_set_error("rrd_write rra_ptr to rrd");
2122 return -1;
2123 }
2124 return 0;
2125 }
2126 #endif