2 /*****************************************************************************
3 * RRDtool 1.3.3 Copyright by Tobi Oetiker, 1997-2008
4 *****************************************************************************
5 * rrd_update.c RRD Update Function
6 *****************************************************************************
7 * $Id$
8 *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
27 /*
28 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
29 * replacement.
30 */
31 #include <sys/timeb.h>
33 #ifndef __MINGW32__
34 struct timeval {
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
37 };
38 #endif
40 struct __timezone {
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
43 };
45 static int gettimeofday(
46 struct timeval *t,
47 struct __timezone *tz)
48 {
50 struct _timeb current_time;
52 _ftime(¤t_time);
54 t->tv_sec = current_time.time;
55 t->tv_usec = current_time.millitm * 1000;
57 return 0;
58 }
60 #endif
62 /* FUNCTION PROTOTYPES */
64 int rrd_update_r(
65 const char *filename,
66 const char *tmplt,
67 int argc,
68 const char **argv);
69 int _rrd_update(
70 const char *filename,
71 const char *tmplt,
72 int argc,
73 const char **argv,
74 rrd_info_t *);
76 static int allocate_data_structures(
77 rrd_t *rrd,
78 char ***updvals,
79 rrd_value_t **pdp_temp,
80 const char *tmplt,
81 long **tmpl_idx,
82 unsigned long *tmpl_cnt,
83 unsigned long **rra_step_cnt,
84 unsigned long **skip_update,
85 rrd_value_t **pdp_new);
87 static int parse_template(
88 rrd_t *rrd,
89 const char *tmplt,
90 unsigned long *tmpl_cnt,
91 long *tmpl_idx);
93 static int process_arg(
94 char *step_start,
95 rrd_t *rrd,
96 rrd_file_t *rrd_file,
97 unsigned long rra_begin,
98 time_t *current_time,
99 unsigned long *current_time_usec,
100 rrd_value_t *pdp_temp,
101 rrd_value_t *pdp_new,
102 unsigned long *rra_step_cnt,
103 char **updvals,
104 long *tmpl_idx,
105 unsigned long tmpl_cnt,
106 rrd_info_t ** pcdp_summary,
107 int version,
108 unsigned long *skip_update,
109 int *schedule_smooth);
111 static int parse_ds(
112 rrd_t *rrd,
113 char **updvals,
114 long *tmpl_idx,
115 char *input,
116 unsigned long tmpl_cnt,
117 time_t *current_time,
118 unsigned long *current_time_usec,
119 int version);
121 static int get_time_from_reading(
122 rrd_t *rrd,
123 char timesyntax,
124 char **updvals,
125 time_t *current_time,
126 unsigned long *current_time_usec,
127 int version);
129 static int update_pdp_prep(
130 rrd_t *rrd,
131 char **updvals,
132 rrd_value_t *pdp_new,
133 double interval);
135 static int calculate_elapsed_steps(
136 rrd_t *rrd,
137 unsigned long current_time,
138 unsigned long current_time_usec,
139 double interval,
140 double *pre_int,
141 double *post_int,
142 unsigned long *proc_pdp_cnt);
144 static void simple_update(
145 rrd_t *rrd,
146 double interval,
147 rrd_value_t *pdp_new);
149 static int process_all_pdp_st(
150 rrd_t *rrd,
151 double interval,
152 double pre_int,
153 double post_int,
154 unsigned long elapsed_pdp_st,
155 rrd_value_t *pdp_new,
156 rrd_value_t *pdp_temp);
158 static int process_pdp_st(
159 rrd_t *rrd,
160 unsigned long ds_idx,
161 double interval,
162 double pre_int,
163 double post_int,
164 long diff_pdp_st,
165 rrd_value_t *pdp_new,
166 rrd_value_t *pdp_temp);
168 static int update_all_cdp_prep(
169 rrd_t *rrd,
170 unsigned long *rra_step_cnt,
171 unsigned long rra_begin,
172 rrd_file_t *rrd_file,
173 unsigned long elapsed_pdp_st,
174 unsigned long proc_pdp_cnt,
175 rrd_value_t **last_seasonal_coef,
176 rrd_value_t **seasonal_coef,
177 rrd_value_t *pdp_temp,
178 unsigned long *skip_update,
179 int *schedule_smooth);
181 static int do_schedule_smooth(
182 rrd_t *rrd,
183 unsigned long rra_idx,
184 unsigned long elapsed_pdp_st);
186 static int update_cdp_prep(
187 rrd_t *rrd,
188 unsigned long elapsed_pdp_st,
189 unsigned long start_pdp_offset,
190 unsigned long *rra_step_cnt,
191 int rra_idx,
192 rrd_value_t *pdp_temp,
193 rrd_value_t *last_seasonal_coef,
194 rrd_value_t *seasonal_coef,
195 int current_cf);
197 static void update_cdp(
198 unival *scratch,
199 int current_cf,
200 rrd_value_t pdp_temp_val,
201 unsigned long rra_step_cnt,
202 unsigned long elapsed_pdp_st,
203 unsigned long start_pdp_offset,
204 unsigned long pdp_cnt,
205 rrd_value_t xff,
206 int i,
207 int ii);
209 static void initialize_cdp_val(
210 unival *scratch,
211 int current_cf,
212 rrd_value_t pdp_temp_val,
213 unsigned long elapsed_pdp_st,
214 unsigned long start_pdp_offset,
215 unsigned long pdp_cnt);
217 static void reset_cdp(
218 rrd_t *rrd,
219 unsigned long elapsed_pdp_st,
220 rrd_value_t *pdp_temp,
221 rrd_value_t *last_seasonal_coef,
222 rrd_value_t *seasonal_coef,
223 int rra_idx,
224 int ds_idx,
225 int cdp_idx,
226 enum cf_en current_cf);
228 static rrd_value_t initialize_average_carry_over(
229 rrd_value_t pdp_temp_val,
230 unsigned long elapsed_pdp_st,
231 unsigned long start_pdp_offset,
232 unsigned long pdp_cnt);
234 static rrd_value_t calculate_cdp_val(
235 rrd_value_t cdp_val,
236 rrd_value_t pdp_temp_val,
237 unsigned long elapsed_pdp_st,
238 int current_cf,
239 int i,
240 int ii);
242 static int update_aberrant_cdps(
243 rrd_t *rrd,
244 rrd_file_t *rrd_file,
245 unsigned long rra_begin,
246 unsigned long elapsed_pdp_st,
247 rrd_value_t *pdp_temp,
248 rrd_value_t **seasonal_coef);
250 static int write_to_rras(
251 rrd_t *rrd,
252 rrd_file_t *rrd_file,
253 unsigned long *rra_step_cnt,
254 unsigned long rra_begin,
255 time_t current_time,
256 unsigned long *skip_update,
257 rrd_info_t ** pcdp_summary);
259 static int write_RRA_row(
260 rrd_file_t *rrd_file,
261 rrd_t *rrd,
262 unsigned long rra_idx,
263 unsigned short CDP_scratch_idx,
264 rrd_info_t ** pcdp_summary,
265 time_t rra_time);
267 static int smooth_all_rras(
268 rrd_t *rrd,
269 rrd_file_t *rrd_file,
270 unsigned long rra_begin);
272 #ifndef HAVE_MMAP
273 static int write_changes_to_disk(
274 rrd_t *rrd,
275 rrd_file_t *rrd_file,
276 int version);
277 #endif
279 /*
280 * normalize time as returned by gettimeofday. usec part must
281 * be always >= 0
282 */
283 static inline void normalize_time(
284 struct timeval *t)
285 {
286 if (t->tv_usec < 0) {
287 t->tv_sec--;
288 t->tv_usec += 1e6L;
289 }
290 }
292 /*
293 * Sets current_time and current_time_usec based on the current time.
294 * current_time_usec is set to 0 if the version number is 1 or 2.
295 */
296 static inline void initialize_time(
297 time_t *current_time,
298 unsigned long *current_time_usec,
299 int version)
300 {
301 struct timeval tmp_time; /* used for time conversion */
303 gettimeofday(&tmp_time, 0);
304 normalize_time(&tmp_time);
305 *current_time = tmp_time.tv_sec;
306 if (version >= 3) {
307 *current_time_usec = tmp_time.tv_usec;
308 } else {
309 *current_time_usec = 0;
310 }
311 }
313 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
315 rrd_info_t *rrd_update_v(
316 int argc,
317 char **argv)
318 {
319 char *tmplt = NULL;
320 rrd_info_t *result = NULL;
321 rrd_infoval_t rc;
322 struct option long_options[] = {
323 {"template", required_argument, 0, 't'},
324 {0, 0, 0, 0}
325 };
327 rc.u_int = -1;
328 optind = 0;
329 opterr = 0; /* initialize getopt */
331 while (1) {
332 int option_index = 0;
333 int opt;
335 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
337 if (opt == EOF)
338 break;
340 switch (opt) {
341 case 't':
342 tmplt = optarg;
343 break;
345 case '?':
346 rrd_set_error("unknown option '%s'", argv[optind - 1]);
347 goto end_tag;
348 }
349 }
351 /* need at least 2 arguments: filename, data. */
352 if (argc - optind < 2) {
353 rrd_set_error("Not enough arguments");
354 goto end_tag;
355 }
356 rc.u_int = 0;
357 result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
358 rc.u_int = _rrd_update(argv[optind], tmplt,
359 argc - optind - 1,
360 (const char **) (argv + optind + 1), result);
361 result->value.u_int = rc.u_int;
362 end_tag:
363 return result;
364 }
366 int rrd_update(
367 int argc,
368 char **argv)
369 {
370 struct option long_options[] = {
371 {"template", required_argument, 0, 't'},
372 {0, 0, 0, 0}
373 };
374 int option_index = 0;
375 int opt;
376 char *tmplt = NULL;
377 int rc = -1;
379 optind = 0;
380 opterr = 0; /* initialize getopt */
382 while (1) {
383 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
385 if (opt == EOF)
386 break;
388 switch (opt) {
389 case 't':
390 tmplt = strdup(optarg);
391 break;
393 case '?':
394 rrd_set_error("unknown option '%s'", argv[optind - 1]);
395 goto out;
396 }
397 }
399 /* need at least 2 arguments: filename, data. */
400 if (argc - optind < 2) {
401 rrd_set_error("Not enough arguments");
402 goto out;
403 }
405 rc = rrd_update_r(argv[optind], tmplt,
406 argc - optind - 1, (const char **) (argv + optind + 1));
407 out:
408 free(tmplt);
409 return rc;
410 }
412 int rrd_update_r(
413 const char *filename,
414 const char *tmplt,
415 int argc,
416 const char **argv)
417 {
418 return _rrd_update(filename, tmplt, argc, argv, NULL);
419 }
421 int _rrd_update(
422 const char *filename,
423 const char *tmplt,
424 int argc,
425 const char **argv,
426 rrd_info_t * pcdp_summary)
427 {
429 int arg_i = 2;
431 unsigned long rra_begin; /* byte pointer to the rra
432 * area in the rrd file. this
433 * pointer never changes value */
434 rrd_value_t *pdp_new; /* prepare the incoming data to be added
435 * to the existing entry */
436 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
437 * to the cdp values */
439 long *tmpl_idx; /* index representing the settings
440 * transported by the tmplt index */
441 unsigned long tmpl_cnt = 2; /* time and data */
442 rrd_t rrd;
443 time_t current_time = 0;
444 unsigned long current_time_usec = 0; /* microseconds part of current time */
445 char **updvals;
446 int schedule_smooth = 0;
448 /* number of elapsed PDP steps since last update */
449 unsigned long *rra_step_cnt = NULL;
451 int version; /* rrd version */
452 rrd_file_t *rrd_file;
453 char *arg_copy; /* for processing the argv */
454 unsigned long *skip_update; /* RRAs to advance but not write */
456 /* need at least 1 arguments: data. */
457 if (argc < 1) {
458 rrd_set_error("Not enough arguments");
459 goto err_out;
460 }
462 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
463 goto err_free;
464 }
465 /* We are now at the beginning of the rra's */
466 rra_begin = rrd_file->header_len;
468 version = atoi(rrd.stat_head->version);
470 initialize_time(¤t_time, ¤t_time_usec, version);
472 /* get exclusive lock to whole file.
473 * lock gets removed when we close the file.
474 */
475 if (rrd_lock(rrd_file) != 0) {
476 rrd_set_error("could not lock RRD");
477 goto err_close;
478 }
480 if (allocate_data_structures(&rrd, &updvals,
481 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
482 &rra_step_cnt, &skip_update,
483 &pdp_new) == -1) {
484 goto err_close;
485 }
487 /* loop through the arguments. */
488 for (arg_i = 0; arg_i < argc; arg_i++) {
489 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
490 rrd_set_error("failed duplication argv entry");
491 break;
492 }
493 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
494 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
495 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
496 &pcdp_summary, version, skip_update,
497 &schedule_smooth) == -1) {
498 if (rrd_test_error()) { /* Should have error string always here */
499 char *save_error;
501 /* Prepend file name to error message */
502 if ((save_error = strdup(rrd_get_error())) != NULL) {
503 rrd_set_error("%s: %s", filename, save_error);
504 free(save_error);
505 }
506 }
507 free(arg_copy);
508 break;
509 }
510 free(arg_copy);
511 }
513 free(rra_step_cnt);
515 /* if we got here and if there is an error and if the file has not been
516 * written to, then close things up and return. */
517 if (rrd_test_error()) {
518 goto err_free_structures;
519 }
520 #ifndef HAVE_MMAP
521 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
522 goto err_free_structures;
523 }
524 #endif
526 /* calling the smoothing code here guarantees at most one smoothing
527 * operation per rrd_update call. Unfortunately, it is possible with bulk
528 * updates, or a long-delayed update for smoothing to occur off-schedule.
529 * This really isn't critical except during the burn-in cycles. */
530 if (schedule_smooth) {
531 smooth_all_rras(&rrd, rrd_file, rra_begin);
532 }
534 /* rrd_dontneed(rrd_file,&rrd); */
535 rrd_free(&rrd);
536 rrd_close(rrd_file);
538 free(pdp_new);
539 free(tmpl_idx);
540 free(pdp_temp);
541 free(skip_update);
542 free(updvals);
543 return 0;
545 err_free_structures:
546 free(pdp_new);
547 free(tmpl_idx);
548 free(pdp_temp);
549 free(skip_update);
550 free(updvals);
551 err_close:
552 rrd_close(rrd_file);
553 err_free:
554 rrd_free(&rrd);
555 err_out:
556 return -1;
557 }
559 /*
560 * get exclusive lock to whole file.
561 * lock gets removed when we close the file
562 *
563 * returns 0 on success
564 */
565 int rrd_lock(
566 rrd_file_t *file)
567 {
568 int rcstat;
570 {
571 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
572 struct _stat st;
574 if (_fstat(file->fd, &st) == 0) {
575 rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
576 } else {
577 rcstat = -1;
578 }
579 #else
580 struct flock lock;
582 lock.l_type = F_WRLCK; /* exclusive write lock */
583 lock.l_len = 0; /* whole file */
584 lock.l_start = 0; /* start of file */
585 lock.l_whence = SEEK_SET; /* end of file */
587 rcstat = fcntl(file->fd, F_SETLK, &lock);
588 #endif
589 }
591 return (rcstat);
592 }
594 /*
595 * Allocate some important arrays used, and initialize the template.
596 *
597 * When it returns, either all of the structures are allocated
598 * or none of them are.
599 *
600 * Returns 0 on success, -1 on error.
601 */
602 static int allocate_data_structures(
603 rrd_t *rrd,
604 char ***updvals,
605 rrd_value_t **pdp_temp,
606 const char *tmplt,
607 long **tmpl_idx,
608 unsigned long *tmpl_cnt,
609 unsigned long **rra_step_cnt,
610 unsigned long **skip_update,
611 rrd_value_t **pdp_new)
612 {
613 unsigned i, ii;
614 if ((*updvals = (char **) malloc(sizeof(char *)
615 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
616 rrd_set_error("allocating updvals pointer array.");
617 return -1;
618 }
619 if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
620 * rrd->stat_head->ds_cnt)) ==
621 NULL) {
622 rrd_set_error("allocating pdp_temp.");
623 goto err_free_updvals;
624 }
625 if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
626 *
627 rrd->stat_head->rra_cnt)) ==
628 NULL) {
629 rrd_set_error("allocating skip_update.");
630 goto err_free_pdp_temp;
631 }
632 if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
633 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
634 rrd_set_error("allocating tmpl_idx.");
635 goto err_free_skip_update;
636 }
637 if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
638 *
639 (rrd->stat_head->
640 rra_cnt))) == NULL) {
641 rrd_set_error("allocating rra_step_cnt.");
642 goto err_free_tmpl_idx;
643 }
645 /* initialize tmplt redirector */
646 /* default config example (assume DS 1 is a CDEF DS)
647 tmpl_idx[0] -> 0; (time)
648 tmpl_idx[1] -> 1; (DS 0)
649 tmpl_idx[2] -> 3; (DS 2)
650 tmpl_idx[3] -> 4; (DS 3) */
651 (*tmpl_idx)[0] = 0; /* time */
652 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
653 if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
654 (*tmpl_idx)[ii++] = i;
655 }
656 *tmpl_cnt = ii;
658 if (tmplt != NULL) {
659 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
660 goto err_free_rra_step_cnt;
661 }
662 }
664 if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
665 * rrd->stat_head->ds_cnt)) == NULL) {
666 rrd_set_error("allocating pdp_new.");
667 goto err_free_rra_step_cnt;
668 }
670 return 0;
672 err_free_rra_step_cnt:
673 free(*rra_step_cnt);
674 err_free_tmpl_idx:
675 free(*tmpl_idx);
676 err_free_skip_update:
677 free(*skip_update);
678 err_free_pdp_temp:
679 free(*pdp_temp);
680 err_free_updvals:
681 free(*updvals);
682 return -1;
683 }
685 /*
686 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
687 *
688 * Returns 0 on success.
689 */
690 static int parse_template(
691 rrd_t *rrd,
692 const char *tmplt,
693 unsigned long *tmpl_cnt,
694 long *tmpl_idx)
695 {
696 char *dsname, *tmplt_copy;
697 unsigned int tmpl_len, i;
698 int ret = 0;
700 *tmpl_cnt = 1; /* the first entry is the time */
702 /* we should work on a writeable copy here */
703 if ((tmplt_copy = strdup(tmplt)) == NULL) {
704 rrd_set_error("error copying tmplt '%s'", tmplt);
705 ret = -1;
706 goto out;
707 }
709 dsname = tmplt_copy;
710 tmpl_len = strlen(tmplt_copy);
711 for (i = 0; i <= tmpl_len; i++) {
712 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
713 tmplt_copy[i] = '\0';
714 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
715 rrd_set_error("tmplt contains more DS definitions than RRD");
716 ret = -1;
717 goto out_free_tmpl_copy;
718 }
719 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
720 rrd_set_error("unknown DS name '%s'", dsname);
721 ret = -1;
722 goto out_free_tmpl_copy;
723 }
724 /* go to the next entry on the tmplt_copy */
725 if (i < tmpl_len)
726 dsname = &tmplt_copy[i + 1];
727 }
728 }
729 out_free_tmpl_copy:
730 free(tmplt_copy);
731 out:
732 return ret;
733 }
735 /*
736 * Parse an update string, updates the primary data points (PDPs)
737 * and consolidated data points (CDPs), and writes changes to the RRAs.
738 *
739 * Returns 0 on success, -1 on error.
740 */
741 static int process_arg(
742 char *step_start,
743 rrd_t *rrd,
744 rrd_file_t *rrd_file,
745 unsigned long rra_begin,
746 time_t *current_time,
747 unsigned long *current_time_usec,
748 rrd_value_t *pdp_temp,
749 rrd_value_t *pdp_new,
750 unsigned long *rra_step_cnt,
751 char **updvals,
752 long *tmpl_idx,
753 unsigned long tmpl_cnt,
754 rrd_info_t ** pcdp_summary,
755 int version,
756 unsigned long *skip_update,
757 int *schedule_smooth)
758 {
759 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
761 /* a vector of future Holt-Winters seasonal coefs */
762 unsigned long elapsed_pdp_st;
764 double interval, pre_int, post_int; /* interval between this and
765 * the last run */
766 unsigned long proc_pdp_cnt;
768 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
769 current_time, current_time_usec, version) == -1) {
770 return -1;
771 }
773 interval = (double) (*current_time - rrd->live_head->last_up)
774 + (double) ((long) *current_time_usec -
775 (long) rrd->live_head->last_up_usec) / 1e6f;
777 /* process the data sources and update the pdp_prep
778 * area accordingly */
779 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
780 return -1;
781 }
783 elapsed_pdp_st = calculate_elapsed_steps(rrd,
784 *current_time,
785 *current_time_usec, interval,
786 &pre_int, &post_int,
787 &proc_pdp_cnt);
789 /* has a pdp_st moment occurred since the last run ? */
790 if (elapsed_pdp_st == 0) {
791 /* no we have not passed a pdp_st moment. therefore update is simple */
792 simple_update(rrd, interval, pdp_new);
793 } else {
794 /* an pdp_st has occurred. */
795 if (process_all_pdp_st(rrd, interval,
796 pre_int, post_int,
797 elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
798 return -1;
799 }
800 if (update_all_cdp_prep(rrd, rra_step_cnt,
801 rra_begin, rrd_file,
802 elapsed_pdp_st,
803 proc_pdp_cnt,
804 &last_seasonal_coef,
805 &seasonal_coef,
806 pdp_temp,
807 skip_update, schedule_smooth) == -1) {
808 goto err_free_coefficients;
809 }
810 if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
811 elapsed_pdp_st, pdp_temp,
812 &seasonal_coef) == -1) {
813 goto err_free_coefficients;
814 }
815 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
816 *current_time, skip_update,
817 pcdp_summary) == -1) {
818 goto err_free_coefficients;
819 }
820 } /* endif a pdp_st has occurred */
821 rrd->live_head->last_up = *current_time;
822 rrd->live_head->last_up_usec = *current_time_usec;
824 if (version < 3) {
825 *rrd->legacy_last_up = rrd->live_head->last_up;
826 }
827 free(seasonal_coef);
828 free(last_seasonal_coef);
829 return 0;
831 err_free_coefficients:
832 free(seasonal_coef);
833 free(last_seasonal_coef);
834 return -1;
835 }
837 /*
838 * Parse a DS string (time + colon-separated values), storing the
839 * results in current_time, current_time_usec, and updvals.
840 *
841 * Returns 0 on success, -1 on error.
842 */
843 static int parse_ds(
844 rrd_t *rrd,
845 char **updvals,
846 long *tmpl_idx,
847 char *input,
848 unsigned long tmpl_cnt,
849 time_t *current_time,
850 unsigned long *current_time_usec,
851 int version)
852 {
853 char *p;
854 unsigned long i;
855 char timesyntax;
857 updvals[0] = input;
858 /* initialize all ds input to unknown except the first one
859 which has always got to be set */
860 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
861 updvals[i] = "U";
863 /* separate all ds elements; first must be examined separately
864 due to alternate time syntax */
865 if ((p = strchr(input, '@')) != NULL) {
866 timesyntax = '@';
867 } else if ((p = strchr(input, ':')) != NULL) {
868 timesyntax = ':';
869 } else {
870 rrd_set_error("expected timestamp not found in data source from %s",
871 input);
872 return -1;
873 }
874 *p = '\0';
875 i = 1;
876 updvals[tmpl_idx[i++]] = p + 1;
877 while (*(++p)) {
878 if (*p == ':') {
879 *p = '\0';
880 if (i < tmpl_cnt) {
881 updvals[tmpl_idx[i++]] = p + 1;
882 }
883 }
884 }
886 if (i != tmpl_cnt) {
887 rrd_set_error("expected %lu data source readings (got %lu) from %s",
888 tmpl_cnt - 1, i, input);
889 return -1;
890 }
892 if (get_time_from_reading(rrd, timesyntax, updvals,
893 current_time, current_time_usec,
894 version) == -1) {
895 return -1;
896 }
897 return 0;
898 }
900 /*
901 * Parse the time in a DS string, store it in current_time and
902 * current_time_usec and verify that it's later than the last
903 * update for this DS.
904 *
905 * Returns 0 on success, -1 on error.
906 */
907 static int get_time_from_reading(
908 rrd_t *rrd,
909 char timesyntax,
910 char **updvals,
911 time_t *current_time,
912 unsigned long *current_time_usec,
913 int version)
914 {
915 double tmp;
916 char *parsetime_error = NULL;
917 char *old_locale;
918 rrd_time_value_t ds_tv;
919 struct timeval tmp_time; /* used for time conversion */
921 /* get the time from the reading ... handle N */
922 if (timesyntax == '@') { /* at-style */
923 if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
924 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
925 return -1;
926 }
927 if (ds_tv.type == RELATIVE_TO_END_TIME ||
928 ds_tv.type == RELATIVE_TO_START_TIME) {
929 rrd_set_error("specifying time relative to the 'start' "
930 "or 'end' makes no sense here: %s", updvals[0]);
931 return -1;
932 }
933 *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
934 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
935 } else if (strcmp(updvals[0], "N") == 0) {
936 gettimeofday(&tmp_time, 0);
937 normalize_time(&tmp_time);
938 *current_time = tmp_time.tv_sec;
939 *current_time_usec = tmp_time.tv_usec;
940 } else {
941 old_locale = setlocale(LC_NUMERIC, "C");
942 tmp = strtod(updvals[0], 0);
943 setlocale(LC_NUMERIC, old_locale);
944 *current_time = floor(tmp);
945 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
946 }
947 /* dont do any correction for old version RRDs */
948 if (version < 3)
949 *current_time_usec = 0;
951 if (*current_time < rrd->live_head->last_up ||
952 (*current_time == rrd->live_head->last_up &&
953 (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
954 rrd_set_error("illegal attempt to update using time %ld when "
955 "last update time is %ld (minimum one second step)",
956 *current_time, rrd->live_head->last_up);
957 return -1;
958 }
959 return 0;
960 }
962 /*
963 * Update pdp_new by interpreting the updvals according to the DS type
964 * (COUNTER, GAUGE, etc.).
965 *
966 * Returns 0 on success, -1 on error.
967 */
968 static int update_pdp_prep(
969 rrd_t *rrd,
970 char **updvals,
971 rrd_value_t *pdp_new,
972 double interval)
973 {
974 unsigned long ds_idx;
975 int ii;
976 char *endptr; /* used in the conversion */
977 double rate;
978 char *old_locale;
979 enum dst_en dst_idx;
981 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
982 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
984 /* make sure we do not build diffs with old last_ds values */
985 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
986 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
987 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
988 }
990 /* NOTE: DST_CDEF should never enter this if block, because
991 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
992 * accidently specified a value for the DST_CDEF. To handle this case,
993 * an extra check is required. */
995 if ((updvals[ds_idx + 1][0] != 'U') &&
996 (dst_idx != DST_CDEF) &&
997 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
998 rate = DNAN;
1000 /* pdp_new contains rate * time ... eg the bytes transferred during
1001 * the interval. Doing it this way saves a lot of math operations
1002 */
1003 switch (dst_idx) {
1004 case DST_COUNTER:
1005 case DST_DERIVE:
1006 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1007 if ((updvals[ds_idx + 1][ii] < '0'
1008 || updvals[ds_idx + 1][ii] > '9')
1009 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1010 rrd_set_error("not a simple integer: '%s'",
1011 updvals[ds_idx + 1]);
1012 return -1;
1013 }
1014 }
1015 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1016 pdp_new[ds_idx] =
1017 rrd_diff(updvals[ds_idx + 1],
1018 rrd->pdp_prep[ds_idx].last_ds);
1019 if (dst_idx == DST_COUNTER) {
1020 /* simple overflow catcher. This will fail
1021 * terribly for non 32 or 64 bit counters
1022 * ... are there any others in SNMP land?
1023 */
1024 if (pdp_new[ds_idx] < (double) 0.0)
1025 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
1026 if (pdp_new[ds_idx] < (double) 0.0)
1027 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1028 }
1029 rate = pdp_new[ds_idx] / interval;
1030 } else {
1031 pdp_new[ds_idx] = DNAN;
1032 }
1033 break;
1034 case DST_ABSOLUTE:
1035 old_locale = setlocale(LC_NUMERIC, "C");
1036 errno = 0;
1037 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1038 setlocale(LC_NUMERIC, old_locale);
1039 if (errno > 0) {
1040 rrd_set_error("converting '%s' to float: %s",
1041 updvals[ds_idx + 1], rrd_strerror(errno));
1042 return -1;
1043 };
1044 if (endptr[0] != '\0') {
1045 rrd_set_error
1046 ("conversion of '%s' to float not complete: tail '%s'",
1047 updvals[ds_idx + 1], endptr);
1048 return -1;
1049 }
1050 rate = pdp_new[ds_idx] / interval;
1051 break;
1052 case DST_GAUGE:
1053 errno = 0;
1054 old_locale = setlocale(LC_NUMERIC, "C");
1055 pdp_new[ds_idx] =
1056 strtod(updvals[ds_idx + 1], &endptr) * interval;
1057 setlocale(LC_NUMERIC, old_locale);
1058 if (errno) {
1059 rrd_set_error("converting '%s' to float: %s",
1060 updvals[ds_idx + 1], rrd_strerror(errno));
1061 return -1;
1062 };
1063 if (endptr[0] != '\0') {
1064 rrd_set_error
1065 ("conversion of '%s' to float not complete: tail '%s'",
1066 updvals[ds_idx + 1], endptr);
1067 return -1;
1068 }
1069 rate = pdp_new[ds_idx] / interval;
1070 break;
1071 default:
1072 rrd_set_error("rrd contains unknown DS type : '%s'",
1073 rrd->ds_def[ds_idx].dst);
1074 return -1;
1075 }
1076 /* break out of this for loop if the error string is set */
1077 if (rrd_test_error()) {
1078 return -1;
1079 }
1080 /* make sure pdp_temp is neither too large or too small
1081 * if any of these occur it becomes unknown ...
1082 * sorry folks ... */
1083 if (!isnan(rate) &&
1084 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1085 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1086 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1087 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1088 pdp_new[ds_idx] = DNAN;
1089 }
1090 } else {
1091 /* no news is news all the same */
1092 pdp_new[ds_idx] = DNAN;
1093 }
1096 /* make a copy of the command line argument for the next run */
1097 #ifdef DEBUG
1098 fprintf(stderr, "prep ds[%lu]\t"
1099 "last_arg '%s'\t"
1100 "this_arg '%s'\t"
1101 "pdp_new %10.2f\n",
1102 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1103 pdp_new[ds_idx]);
1104 #endif
1105 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1106 LAST_DS_LEN - 1);
1107 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1108 }
1109 return 0;
1110 }
1112 /*
1113 * How many PDP steps have elapsed since the last update? Returns the answer,
1114 * and stores the time between the last update and the last PDP in pre_time,
1115 * and the time between the last PDP and the current time in post_int.
1116 */
1117 static int calculate_elapsed_steps(
1118 rrd_t *rrd,
1119 unsigned long current_time,
1120 unsigned long current_time_usec,
1121 double interval,
1122 double *pre_int,
1123 double *post_int,
1124 unsigned long *proc_pdp_cnt)
1125 {
1126 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1127 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1128 * time */
1129 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1130 * when it was last updated */
1131 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1133 /* when was the current pdp started */
1134 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1135 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1137 /* when did the last pdp_st occur */
1138 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1139 occu_pdp_st = current_time - occu_pdp_age;
1141 if (occu_pdp_st > proc_pdp_st) {
1142 /* OK we passed the pdp_st moment */
1143 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1144 * occurred before the latest
1145 * pdp_st moment*/
1146 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1147 *post_int = occu_pdp_age; /* how much after it */
1148 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1149 } else {
1150 *pre_int = interval;
1151 *post_int = 0;
1152 }
1154 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1156 #ifdef DEBUG
1157 printf("proc_pdp_age %lu\t"
1158 "proc_pdp_st %lu\t"
1159 "occu_pfp_age %lu\t"
1160 "occu_pdp_st %lu\t"
1161 "int %lf\t"
1162 "pre_int %lf\t"
1163 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1164 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1165 #endif
1167 /* compute the number of elapsed pdp_st moments */
1168 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1169 }
1171 /*
1172 * Increment the PDP values by the values in pdp_new, or else initialize them.
1173 */
1174 static void simple_update(
1175 rrd_t *rrd,
1176 double interval,
1177 rrd_value_t *pdp_new)
1178 {
1179 int i;
1181 for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1182 if (isnan(pdp_new[i])) {
1183 /* this is not really accurate if we use subsecond data arrival time
1184 should have thought of it when going subsecond resolution ...
1185 sorry next format change we will have it! */
1186 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1187 floor(interval);
1188 } else {
1189 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1190 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1191 } else {
1192 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1193 }
1194 }
1195 #ifdef DEBUG
1196 fprintf(stderr,
1197 "NO PDP ds[%i]\t"
1198 "value %10.2f\t"
1199 "unkn_sec %5lu\n",
1200 i,
1201 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1202 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1203 #endif
1204 }
1205 }
1207 /*
1208 * Call process_pdp_st for each DS.
1209 *
1210 * Returns 0 on success, -1 on error.
1211 */
1212 static int process_all_pdp_st(
1213 rrd_t *rrd,
1214 double interval,
1215 double pre_int,
1216 double post_int,
1217 unsigned long elapsed_pdp_st,
1218 rrd_value_t *pdp_new,
1219 rrd_value_t *pdp_temp)
1220 {
1221 unsigned long ds_idx;
1223 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1224 rate*seconds which occurred up to the last run.
1225 pdp_new[] contains rate*seconds from the latest run.
1226 pdp_temp[] will contain the rate for cdp */
1228 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1229 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1230 elapsed_pdp_st * rrd->stat_head->pdp_step,
1231 pdp_new, pdp_temp) == -1) {
1232 return -1;
1233 }
1234 #ifdef DEBUG
1235 fprintf(stderr, "PDP UPD ds[%lu]\t"
1236 "elapsed_pdp_st %lu\t"
1237 "pdp_temp %10.2f\t"
1238 "new_prep %10.2f\t"
1239 "new_unkn_sec %5lu\n",
1240 ds_idx,
1241 elapsed_pdp_st,
1242 pdp_temp[ds_idx],
1243 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1244 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1245 #endif
1246 }
1247 return 0;
1248 }
1250 /*
1251 * Process an update that occurs after one of the PDP moments.
1252 * Increments the PDP value, sets NAN if time greater than the
1253 * heartbeats have elapsed, processes CDEFs.
1254 *
1255 * Returns 0 on success, -1 on error.
1256 */
1257 static int process_pdp_st(
1258 rrd_t *rrd,
1259 unsigned long ds_idx,
1260 double interval,
1261 double pre_int,
1262 double post_int,
1263 long diff_pdp_st, /* number of seconds in full steps passed since last update */
1264 rrd_value_t *pdp_new,
1265 rrd_value_t *pdp_temp)
1266 {
1267 int i;
1269 /* update pdp_prep to the current pdp_st. */
1270 double pre_unknown = 0.0;
1271 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1272 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1274 rpnstack_t rpnstack; /* used for COMPUTE DS */
1276 rpnstack_init(&rpnstack);
1279 if (isnan(pdp_new[ds_idx])) {
1280 /* a final bit of unknown to be added before calculation
1281 we use a temporary variable for this so that we
1282 don't have to turn integer lines before using the value */
1283 pre_unknown = pre_int;
1284 } else {
1285 if (isnan(scratch[PDP_val].u_val)) {
1286 scratch[PDP_val].u_val = 0;
1287 }
1288 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1289 }
1291 /* if too much of the pdp_prep is unknown we dump it */
1292 /* if the interval is larger thatn mrhb we get NAN */
1293 if ((interval > mrhb) ||
1294 (rrd->stat_head->pdp_step / 2.0 <
1295 (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1296 pdp_temp[ds_idx] = DNAN;
1297 } else {
1298 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1299 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1300 pre_unknown);
1301 }
1303 /* process CDEF data sources; remember each CDEF DS can
1304 * only reference other DS with a lower index number */
1305 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1306 rpnp_t *rpnp;
1308 rpnp =
1309 rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1310 /* substitute data values for OP_VARIABLE nodes */
1311 for (i = 0; rpnp[i].op != OP_END; i++) {
1312 if (rpnp[i].op == OP_VARIABLE) {
1313 rpnp[i].op = OP_NUMBER;
1314 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1315 }
1316 }
1317 /* run the rpn calculator */
1318 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1319 free(rpnp);
1320 rpnstack_free(&rpnstack);
1321 return -1;
1322 }
1323 }
1325 /* make pdp_prep ready for the next run */
1326 if (isnan(pdp_new[ds_idx])) {
1327 /* this is not realy accurate if we use subsecond data arival time
1328 should have thought of it when going subsecond resolution ...
1329 sorry next format change we will have it! */
1330 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1331 scratch[PDP_val].u_val = DNAN;
1332 } else {
1333 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1334 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1335 }
1336 rpnstack_free(&rpnstack);
1337 return 0;
1338 }
1340 /*
1341 * Iterate over all the RRAs for a given DS and:
1342 * 1. Decide whether to schedule a smooth later
1343 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1344 * 3. Update the CDP
1345 *
1346 * Returns 0 on success, -1 on error
1347 */
1348 static int update_all_cdp_prep(
1349 rrd_t *rrd,
1350 unsigned long *rra_step_cnt,
1351 unsigned long rra_begin,
1352 rrd_file_t *rrd_file,
1353 unsigned long elapsed_pdp_st,
1354 unsigned long proc_pdp_cnt,
1355 rrd_value_t **last_seasonal_coef,
1356 rrd_value_t **seasonal_coef,
1357 rrd_value_t *pdp_temp,
1358 unsigned long *skip_update,
1359 int *schedule_smooth)
1360 {
1361 unsigned long rra_idx;
1363 /* index into the CDP scratch array */
1364 enum cf_en current_cf;
1365 unsigned long rra_start;
1367 /* number of rows to be updated in an RRA for a data value. */
1368 unsigned long start_pdp_offset;
1370 rra_start = rra_begin;
1371 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1372 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1373 start_pdp_offset =
1374 rrd->rra_def[rra_idx].pdp_cnt -
1375 proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1376 skip_update[rra_idx] = 0;
1377 if (start_pdp_offset <= elapsed_pdp_st) {
1378 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1379 rrd->rra_def[rra_idx].pdp_cnt + 1;
1380 } else {
1381 rra_step_cnt[rra_idx] = 0;
1382 }
1384 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1385 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1386 * so that they will be correct for the next observed value; note that for
1387 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1388 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1389 if (rra_step_cnt[rra_idx] > 1) {
1390 skip_update[rra_idx] = 1;
1391 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1392 elapsed_pdp_st, last_seasonal_coef);
1393 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1394 elapsed_pdp_st + 1, seasonal_coef);
1395 }
1396 /* periodically run a smoother for seasonal effects */
1397 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1398 #ifdef DEBUG
1399 fprintf(stderr,
1400 "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1401 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1402 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1403 u_cnt);
1404 #endif
1405 *schedule_smooth = 1;
1406 }
1407 }
1408 if (rrd_test_error())
1409 return -1;
1411 if (update_cdp_prep
1412 (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1413 pdp_temp, *last_seasonal_coef, *seasonal_coef,
1414 current_cf) == -1) {
1415 return -1;
1416 }
1417 rra_start +=
1418 rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1419 sizeof(rrd_value_t);
1420 }
1421 return 0;
1422 }
1424 /*
1425 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1426 */
1427 static int do_schedule_smooth(
1428 rrd_t *rrd,
1429 unsigned long rra_idx,
1430 unsigned long elapsed_pdp_st)
1431 {
1432 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1433 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1434 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1435 unsigned long seasonal_smooth_idx =
1436 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1437 unsigned long *init_seasonal =
1438 &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1440 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1441 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1442 * really an RRA level, not a data source within RRA level parameter, but
1443 * the rra_def is read only for rrd_update (not flushed to disk). */
1444 if (*init_seasonal > BURNIN_CYCLES) {
1445 /* someone has no doubt invented a trick to deal with this wrap around,
1446 * but at least this code is clear. */
1447 if (seasonal_smooth_idx > cur_row) {
1448 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1449 * between PDP and CDP */
1450 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1451 }
1452 /* can't rely on negative numbers because we are working with
1453 * unsigned values */
1454 return (cur_row + elapsed_pdp_st >= row_cnt
1455 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1456 }
1457 /* mark off one of the burn-in cycles */
1458 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1459 }
1461 /*
1462 * For a given RRA, iterate over the data sources and call the appropriate
1463 * consolidation function.
1464 *
1465 * Returns 0 on success, -1 on error.
1466 */
1467 static int update_cdp_prep(
1468 rrd_t *rrd,
1469 unsigned long elapsed_pdp_st,
1470 unsigned long start_pdp_offset,
1471 unsigned long *rra_step_cnt,
1472 int rra_idx,
1473 rrd_value_t *pdp_temp,
1474 rrd_value_t *last_seasonal_coef,
1475 rrd_value_t *seasonal_coef,
1476 int current_cf)
1477 {
1478 unsigned long ds_idx, cdp_idx;
1480 /* update CDP_PREP areas */
1481 /* loop over data soures within each RRA */
1482 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1484 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1486 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1487 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1488 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1489 elapsed_pdp_st, start_pdp_offset,
1490 rrd->rra_def[rra_idx].pdp_cnt,
1491 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1492 rra_idx, ds_idx);
1493 } else {
1494 /* Nothing to consolidate if there's one PDP per CDP. However, if
1495 * we've missed some PDPs, let's update null counters etc. */
1496 if (elapsed_pdp_st > 2) {
1497 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1498 seasonal_coef, rra_idx, ds_idx, cdp_idx,
1499 current_cf);
1500 }
1501 }
1503 if (rrd_test_error())
1504 return -1;
1505 } /* endif data sources loop */
1506 return 0;
1507 }
1509 /*
1510 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1511 * primary value, secondary value, and # of unknowns.
1512 */
1513 static void update_cdp(
1514 unival *scratch,
1515 int current_cf,
1516 rrd_value_t pdp_temp_val,
1517 unsigned long rra_step_cnt,
1518 unsigned long elapsed_pdp_st,
1519 unsigned long start_pdp_offset,
1520 unsigned long pdp_cnt,
1521 rrd_value_t xff,
1522 int i,
1523 int ii)
1524 {
1525 /* shorthand variables */
1526 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1527 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1528 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1529 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1531 if (rra_step_cnt) {
1532 /* If we are in this block, as least 1 CDP value will be written to
1533 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1534 * to be written, then the "fill in" value is the CDP_secondary_val
1535 * entry. */
1536 if (isnan(pdp_temp_val)) {
1537 *cdp_unkn_pdp_cnt += start_pdp_offset;
1538 *cdp_secondary_val = DNAN;
1539 } else {
1540 /* CDP_secondary value is the RRA "fill in" value for intermediary
1541 * CDP data entries. No matter the CF, the value is the same because
1542 * the average, max, min, and last of a list of identical values is
1543 * the same, namely, the value itself. */
1544 *cdp_secondary_val = pdp_temp_val;
1545 }
1547 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1548 *cdp_primary_val = DNAN;
1549 if (current_cf == CF_AVERAGE) {
1550 *cdp_val =
1551 initialize_average_carry_over(pdp_temp_val,
1552 elapsed_pdp_st,
1553 start_pdp_offset, pdp_cnt);
1554 } else {
1555 *cdp_val = pdp_temp_val;
1556 }
1557 } else {
1558 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1559 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1560 } /* endif meets xff value requirement for a valid value */
1561 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1562 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1563 if (isnan(pdp_temp_val))
1564 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1565 else
1566 *cdp_unkn_pdp_cnt = 0;
1567 } else { /* rra_step_cnt[i] == 0 */
1569 #ifdef DEBUG
1570 if (isnan(*cdp_val)) {
1571 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1572 i, ii);
1573 } else {
1574 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1575 i, ii, *cdp_val);
1576 }
1577 #endif
1578 if (isnan(pdp_temp_val)) {
1579 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1580 } else {
1581 *cdp_val =
1582 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1583 current_cf, i, ii);
1584 }
1585 }
1586 }
1588 /*
1589 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1590 * on the type of consolidation function.
1591 */
1592 static void initialize_cdp_val(
1593 unival *scratch,
1594 int current_cf,
1595 rrd_value_t pdp_temp_val,
1596 unsigned long elapsed_pdp_st,
1597 unsigned long start_pdp_offset,
1598 unsigned long pdp_cnt)
1599 {
1600 rrd_value_t cum_val, cur_val;
1602 switch (current_cf) {
1603 case CF_AVERAGE:
1604 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1605 cur_val = IFDNAN(pdp_temp_val, 0.0);
1606 scratch[CDP_primary_val].u_val =
1607 (cum_val + cur_val * start_pdp_offset) /
1608 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1609 scratch[CDP_val].u_val =
1610 initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1611 start_pdp_offset, pdp_cnt);
1612 break;
1613 case CF_MAXIMUM:
1614 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1615 cur_val = IFDNAN(pdp_temp_val, -DINF);
1616 #if 0
1617 #ifdef DEBUG
1618 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1619 fprintf(stderr,
1620 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1621 i, ii);
1622 exit(-1);
1623 }
1624 #endif
1625 #endif
1626 if (cur_val > cum_val)
1627 scratch[CDP_primary_val].u_val = cur_val;
1628 else
1629 scratch[CDP_primary_val].u_val = cum_val;
1630 /* initialize carry over value */
1631 scratch[CDP_val].u_val = pdp_temp_val;
1632 break;
1633 case CF_MINIMUM:
1634 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1635 cur_val = IFDNAN(pdp_temp_val, DINF);
1636 #if 0
1637 #ifdef DEBUG
1638 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1639 fprintf(stderr,
1640 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1641 ii);
1642 exit(-1);
1643 }
1644 #endif
1645 #endif
1646 if (cur_val < cum_val)
1647 scratch[CDP_primary_val].u_val = cur_val;
1648 else
1649 scratch[CDP_primary_val].u_val = cum_val;
1650 /* initialize carry over value */
1651 scratch[CDP_val].u_val = pdp_temp_val;
1652 break;
1653 case CF_LAST:
1654 default:
1655 scratch[CDP_primary_val].u_val = pdp_temp_val;
1656 /* initialize carry over value */
1657 scratch[CDP_val].u_val = pdp_temp_val;
1658 break;
1659 }
1660 }
1662 /*
1663 * Update the consolidation function for Holt-Winters functions as
1664 * well as other functions that don't actually consolidate multiple
1665 * PDPs.
1666 */
1667 static void reset_cdp(
1668 rrd_t *rrd,
1669 unsigned long elapsed_pdp_st,
1670 rrd_value_t *pdp_temp,
1671 rrd_value_t *last_seasonal_coef,
1672 rrd_value_t *seasonal_coef,
1673 int rra_idx,
1674 int ds_idx,
1675 int cdp_idx,
1676 enum cf_en current_cf)
1677 {
1678 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1680 switch (current_cf) {
1681 case CF_AVERAGE:
1682 default:
1683 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1684 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1685 break;
1686 case CF_SEASONAL:
1687 case CF_DEVSEASONAL:
1688 /* need to update cached seasonal values, so they are consistent
1689 * with the bulk update */
1690 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1691 * CDP_last_deviation are the same. */
1692 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1693 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1694 break;
1695 case CF_HWPREDICT:
1696 case CF_MHWPREDICT:
1697 /* need to update the null_count and last_null_count.
1698 * even do this for non-DNAN pdp_temp because the
1699 * algorithm is not learning from batch updates. */
1700 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1701 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1702 /* fall through */
1703 case CF_DEVPREDICT:
1704 scratch[CDP_primary_val].u_val = DNAN;
1705 scratch[CDP_secondary_val].u_val = DNAN;
1706 break;
1707 case CF_FAILURES:
1708 /* do not count missed bulk values as failures */
1709 scratch[CDP_primary_val].u_val = 0;
1710 scratch[CDP_secondary_val].u_val = 0;
1711 /* need to reset violations buffer.
1712 * could do this more carefully, but for now, just
1713 * assume a bulk update wipes away all violations. */
1714 erase_violations(rrd, cdp_idx, rra_idx);
1715 break;
1716 }
1717 }
1719 static rrd_value_t initialize_average_carry_over(
1720 rrd_value_t pdp_temp_val,
1721 unsigned long elapsed_pdp_st,
1722 unsigned long start_pdp_offset,
1723 unsigned long pdp_cnt)
1724 {
1725 /* initialize carry over value */
1726 if (isnan(pdp_temp_val)) {
1727 return DNAN;
1728 }
1729 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1730 }
1732 /*
1733 * Update or initialize a CDP value based on the consolidation
1734 * function.
1735 *
1736 * Returns the new value.
1737 */
1738 static rrd_value_t calculate_cdp_val(
1739 rrd_value_t cdp_val,
1740 rrd_value_t pdp_temp_val,
1741 unsigned long elapsed_pdp_st,
1742 int current_cf,
1743 #ifdef DEBUG
1744 int i,
1745 int ii
1746 #else
1747 int UNUSED(i),
1748 int UNUSED(ii)
1749 #endif
1750 )
1751 {
1752 if (isnan(cdp_val)) {
1753 if (current_cf == CF_AVERAGE) {
1754 pdp_temp_val *= elapsed_pdp_st;
1755 }
1756 #ifdef DEBUG
1757 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1758 i, ii, pdp_temp_val);
1759 #endif
1760 return pdp_temp_val;
1761 }
1762 if (current_cf == CF_AVERAGE)
1763 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1764 if (current_cf == CF_MINIMUM)
1765 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1766 if (current_cf == CF_MAXIMUM)
1767 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1769 return pdp_temp_val;
1770 }
1772 /*
1773 * For each RRA, update the seasonal values and then call update_aberrant_CF
1774 * for each data source.
1775 *
1776 * Return 0 on success, -1 on error.
1777 */
1778 static int update_aberrant_cdps(
1779 rrd_t *rrd,
1780 rrd_file_t *rrd_file,
1781 unsigned long rra_begin,
1782 unsigned long elapsed_pdp_st,
1783 rrd_value_t *pdp_temp,
1784 rrd_value_t **seasonal_coef)
1785 {
1786 unsigned long rra_idx, ds_idx, j;
1788 /* number of PDP steps since the last update that
1789 * are assigned to the first CDP to be generated
1790 * since the last update. */
1791 unsigned short scratch_idx;
1792 unsigned long rra_start;
1793 enum cf_en current_cf;
1795 /* this loop is only entered if elapsed_pdp_st < 3 */
1796 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1797 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1798 rra_start = rra_begin;
1799 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1800 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1801 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1802 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1803 if (scratch_idx == CDP_primary_val) {
1804 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1805 elapsed_pdp_st + 1, seasonal_coef);
1806 } else {
1807 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1808 elapsed_pdp_st + 2, seasonal_coef);
1809 }
1810 }
1811 if (rrd_test_error())
1812 return -1;
1813 /* loop over data soures within each RRA */
1814 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1815 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1816 rra_idx * (rrd->stat_head->ds_cnt) +
1817 ds_idx, rra_idx, ds_idx, scratch_idx,
1818 *seasonal_coef);
1819 }
1820 }
1821 rra_start += rrd->rra_def[rra_idx].row_cnt
1822 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1823 }
1824 }
1825 return 0;
1826 }
1828 /*
1829 * Move sequentially through the file, writing one RRA at a time. Note this
1830 * architecture divorces the computation of CDP with flushing updated RRA
1831 * entries to disk.
1832 *
1833 * Return 0 on success, -1 on error.
1834 */
1835 static int write_to_rras(
1836 rrd_t *rrd,
1837 rrd_file_t *rrd_file,
1838 unsigned long *rra_step_cnt,
1839 unsigned long rra_begin,
1840 time_t current_time,
1841 unsigned long *skip_update,
1842 rrd_info_t ** pcdp_summary)
1843 {
1844 unsigned long rra_idx;
1845 unsigned long rra_start;
1846 time_t rra_time = 0; /* time of update for a RRA */
1848 unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1850 /* Ready to write to disk */
1851 rra_start = rra_begin;
1853 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1854 rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1855 rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1857 /* for cdp_prep */
1858 unsigned short scratch_idx;
1859 unsigned long step_subtract;
1861 for (scratch_idx = CDP_primary_val,
1862 step_subtract = 1;
1863 rra_step_cnt[rra_idx] > 0;
1864 rra_step_cnt[rra_idx]--,
1865 scratch_idx = CDP_secondary_val,
1866 step_subtract = 2) {
1868 off_t rra_pos_new;
1869 #ifdef DEBUG
1870 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1871 #endif
1872 /* increment, with wrap-around */
1873 if (++rra_ptr->cur_row >= rra_def->row_cnt)
1874 rra_ptr->cur_row = 0;
1876 /* we know what our position should be */
1877 rra_pos_new = rra_start
1878 + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1880 /* re-seek if the position is wrong or we wrapped around */
1881 if (rra_pos_new != rrd_file->pos) {
1882 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1883 rrd_set_error("seek error in rrd");
1884 return -1;
1885 }
1886 }
1887 #ifdef DEBUG
1888 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1889 #endif
1891 if (skip_update[rra_idx])
1892 continue;
1894 if (*pcdp_summary != NULL) {
1895 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1897 rra_time = (current_time - current_time % step_time)
1898 - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1899 }
1901 if (write_RRA_row
1902 (rrd_file, rrd, rra_idx, scratch_idx,
1903 pcdp_summary, rra_time) == -1)
1904 return -1;
1905 }
1907 rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1908 } /* RRA LOOP */
1910 return 0;
1911 }
1913 /*
1914 * Write out one row of values (one value per DS) to the archive.
1915 *
1916 * Returns 0 on success, -1 on error.
1917 */
1918 static int write_RRA_row(
1919 rrd_file_t *rrd_file,
1920 rrd_t *rrd,
1921 unsigned long rra_idx,
1922 unsigned short CDP_scratch_idx,
1923 rrd_info_t ** pcdp_summary,
1924 time_t rra_time)
1925 {
1926 unsigned long ds_idx, cdp_idx;
1927 rrd_infoval_t iv;
1929 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1930 /* compute the cdp index */
1931 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1932 #ifdef DEBUG
1933 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1934 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1935 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1936 #endif
1937 if (*pcdp_summary != NULL) {
1938 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1939 /* append info to the return hash */
1940 *pcdp_summary = rrd_info_push(*pcdp_summary,
1941 sprintf_alloc
1942 ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1943 rrd->rra_def[rra_idx].cf_nam,
1944 rrd->rra_def[rra_idx].pdp_cnt,
1945 rrd->ds_def[ds_idx].ds_nam),
1946 RD_I_VAL, iv);
1947 }
1948 if (rrd_write(rrd_file,
1949 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1950 u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1951 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1952 return -1;
1953 }
1954 }
1955 return 0;
1956 }
1958 /*
1959 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1960 *
1961 * Returns 0 on success, -1 otherwise
1962 */
1963 static int smooth_all_rras(
1964 rrd_t *rrd,
1965 rrd_file_t *rrd_file,
1966 unsigned long rra_begin)
1967 {
1968 unsigned long rra_start = rra_begin;
1969 unsigned long rra_idx;
1971 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1972 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1973 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1974 #ifdef DEBUG
1975 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1976 #endif
1977 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1978 if (rrd_test_error())
1979 return -1;
1980 }
1981 rra_start += rrd->rra_def[rra_idx].row_cnt
1982 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1983 }
1984 return 0;
1985 }
1987 #ifndef HAVE_MMAP
1988 /*
1989 * Flush changes to disk (unless we're using mmap)
1990 *
1991 * Returns 0 on success, -1 otherwise
1992 */
1993 static int write_changes_to_disk(
1994 rrd_t *rrd,
1995 rrd_file_t *rrd_file,
1996 int version)
1997 {
1998 /* we just need to write back the live header portion now */
1999 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2000 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2001 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2002 SEEK_SET) != 0) {
2003 rrd_set_error("seek rrd for live header writeback");
2004 return -1;
2005 }
2006 if (version >= 3) {
2007 if (rrd_write(rrd_file, rrd->live_head,
2008 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2009 rrd_set_error("rrd_write live_head to rrd");
2010 return -1;
2011 }
2012 } else {
2013 if (rrd_write(rrd_file, rrd->legacy_last_up,
2014 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2015 rrd_set_error("rrd_write live_head to rrd");
2016 return -1;
2017 }
2018 }
2021 if (rrd_write(rrd_file, rrd->pdp_prep,
2022 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2023 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2024 rrd_set_error("rrd_write pdp_prep to rrd");
2025 return -1;
2026 }
2028 if (rrd_write(rrd_file, rrd->cdp_prep,
2029 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2030 rrd->stat_head->ds_cnt)
2031 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2032 rrd->stat_head->ds_cnt)) {
2034 rrd_set_error("rrd_write cdp_prep to rrd");
2035 return -1;
2036 }
2038 if (rrd_write(rrd_file, rrd->rra_ptr,
2039 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2040 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2041 rrd_set_error("rrd_write rra_ptr to rrd");
2042 return -1;
2043 }
2044 return 0;
2045 }
2046 #endif