1 /*****************************************************************************
2 * RRDtool 1.2.99907080300 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include <locale.h>
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 * replacement.
29 */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
36 };
37 #endif
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static int gettimeofday(
45 struct timeval *t,
46 struct __timezone *tz)
47 {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
61 /* FUNCTION PROTOTYPES */
63 int rrd_update_r(
64 const char *filename,
65 const char *tmplt,
66 int argc,
67 const char **argv);
68 int _rrd_update(
69 const char *filename,
70 const char *tmplt,
71 int argc,
72 const char **argv,
73 info_t *);
75 static int allocate_data_structures(
76 rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt,
77 long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt,
78 unsigned long **skip_update, rrd_value_t **pdp_new);
80 static int parse_template(rrd_t *rrd, const char *tmplt,
81 unsigned long *tmpl_cnt, long *tmpl_idx);
83 static int process_arg(
84 char *step_start,
85 rrd_t *rrd,
86 rrd_file_t *rrd_file,
87 unsigned long rra_begin,
88 unsigned long *rra_current,
89 time_t *current_time,
90 unsigned long *current_time_usec,
91 rrd_value_t *pdp_temp,
92 rrd_value_t *pdp_new,
93 unsigned long *rra_step_cnt,
94 char **updvals,
95 long *tmpl_idx,
96 unsigned long tmpl_cnt,
97 info_t **pcdp_summary,
98 int version,
99 unsigned long *skip_update,
100 int *schedule_smooth);
102 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
103 unsigned long tmpl_cnt, time_t *current_time, unsigned long *current_time_usec,
104 int version);
106 static int get_time_from_reading(rrd_t *rrd, char timesyntax, char **updvals,
107 time_t *current_time, unsigned long *current_time_usec, int version);
109 static int update_pdp_prep(rrd_t *rrd, char **updvals,
110 rrd_value_t *pdp_new, double interval);
112 static int calculate_elapsed_steps(rrd_t *rrd,
113 unsigned long current_time, unsigned long current_time_usec,
114 double interval, double *pre_int, double *post_int,
115 unsigned long *proc_pdp_cnt);
117 static void simple_update(rrd_t *rrd, double interval, rrd_value_t *pdp_new);
119 static int process_all_pdp_st(rrd_t *rrd, double interval,
120 double pre_int, double post_int, unsigned long elapsed_pdp_st,
121 rrd_value_t *pdp_new, rrd_value_t *pdp_temp);
123 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval,
124 double pre_int, double post_int, long diff_pdp_st, rrd_value_t *pdp_new,
125 rrd_value_t *pdp_temp);
127 static int update_all_cdp_prep(
128 rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin,
129 rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
130 rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
131 rrd_value_t *pdp_temp, unsigned long *rra_current,
132 unsigned long *skip_update, int *schedule_smooth);
134 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx,
135 unsigned long elapsed_pdp_st);
137 static int update_cdp_prep(rrd_t *rrd, unsigned long elapsed_pdp_st,
138 unsigned long start_pdp_offset, unsigned long *rra_step_cnt,
139 int rra_idx, rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef,
140 rrd_value_t *seasonal_coef, int current_cf);
142 static void update_cdp(unival *scratch, int current_cf,
143 rrd_value_t pdp_temp_val, unsigned long rra_step_cnt,
144 unsigned long elapsed_pdp_st, unsigned long start_pdp_offset,
145 unsigned long pdp_cnt, rrd_value_t xff, int i, int ii);
147 static void initialize_cdp_val(unival *scratch, int current_cf,
148 rrd_value_t pdp_temp_val, unsigned long elapsed_pdp_st,
149 unsigned long start_pdp_offset, unsigned long pdp_cnt);
151 static void reset_cdp(rrd_t *rrd, unsigned long elapsed_pdp_st,
152 rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef,
153 rrd_value_t *seasonal_coef,
154 int rra_idx, int ds_idx, int cdp_idx, enum cf_en current_cf);
156 static rrd_value_t initialize_average_carry_over(rrd_value_t pdp_temp_val,
157 unsigned long elapsed_pdp_st, unsigned long start_pdp_offset,
158 unsigned long pdp_cnt);
160 static rrd_value_t calculate_cdp_val(
161 rrd_value_t cdp_val, rrd_value_t pdp_temp_val,
162 unsigned long elapsed_pdp_st, int current_cf, int i, int ii);
164 static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file,
165 unsigned long rra_begin, unsigned long *rra_current,
166 unsigned long elapsed_pdp_st, rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef);
168 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file,
169 unsigned long *rra_step_cnt, unsigned long rra_begin,
170 unsigned long *rra_current, time_t current_time,
171 unsigned long *skip_update, info_t **pcdp_summary);
173 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
174 unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
175 time_t rra_time);
177 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file,
178 unsigned long rra_begin);
180 #ifndef HAVE_MMAP
181 static int write_changes_to_disk(rrd_t *rrd, rrd_file_t *rrd_file,
182 int version);
183 #endif
185 /*
186 * normalize time as returned by gettimeofday. usec part must
187 * be always >= 0
188 */
189 static inline void normalize_time(
190 struct timeval *t)
191 {
192 if (t->tv_usec < 0) {
193 t->tv_sec--;
194 t->tv_usec += 1e6L;
195 }
196 }
198 /*
199 * Sets current_time and current_time_usec based on the current time.
200 * current_time_usec is set to 0 if the version number is 1 or 2.
201 */
202 static inline void initialize_time(
203 time_t *current_time, unsigned long *current_time_usec,
204 int version)
205 {
206 struct timeval tmp_time; /* used for time conversion */
208 gettimeofday(&tmp_time, 0);
209 normalize_time(&tmp_time);
210 *current_time = tmp_time.tv_sec;
211 if (version >= 3) {
212 *current_time_usec = tmp_time.tv_usec;
213 } else {
214 *current_time_usec = 0;
215 }
216 }
218 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
220 info_t *rrd_update_v(
221 int argc,
222 char **argv)
223 {
224 char *tmplt = NULL;
225 info_t *result = NULL;
226 infoval rc;
227 struct option long_options[] = {
228 {"template", required_argument, 0, 't'},
229 {0, 0, 0, 0}
230 };
232 rc.u_int = -1;
233 optind = 0;
234 opterr = 0; /* initialize getopt */
236 while (1) {
237 int option_index = 0;
238 int opt;
240 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
242 if (opt == EOF)
243 break;
245 switch (opt) {
246 case 't':
247 tmplt = optarg;
248 break;
250 case '?':
251 rrd_set_error("unknown option '%s'", argv[optind - 1]);
252 goto end_tag;
253 }
254 }
256 /* need at least 2 arguments: filename, data. */
257 if (argc - optind < 2) {
258 rrd_set_error("Not enough arguments");
259 goto end_tag;
260 }
261 rc.u_int = 0;
262 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
263 rc.u_int = _rrd_update(argv[optind], tmplt,
264 argc - optind - 1,
265 (const char **) (argv + optind + 1), result);
266 result->value.u_int = rc.u_int;
267 end_tag:
268 return result;
269 }
271 int rrd_update(
272 int argc,
273 char **argv)
274 {
275 struct option long_options[] = {
276 {"template", required_argument, 0, 't'},
277 {0, 0, 0, 0}
278 };
279 int option_index = 0;
280 int opt;
281 char *tmplt = NULL;
282 int rc = -1;
284 optind = 0;
285 opterr = 0; /* initialize getopt */
287 while (1) {
288 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
290 if (opt == EOF)
291 break;
293 switch (opt) {
294 case 't':
295 tmplt = strdup(optarg);
296 break;
298 case '?':
299 rrd_set_error("unknown option '%s'", argv[optind - 1]);
300 goto out;
301 }
302 }
304 /* need at least 2 arguments: filename, data. */
305 if (argc - optind < 2) {
306 rrd_set_error("Not enough arguments");
307 goto out;
308 }
310 rc = rrd_update_r(argv[optind], tmplt,
311 argc - optind - 1, (const char **) (argv + optind + 1));
312 out:
313 free(tmplt);
314 return rc;
315 }
317 int rrd_update_r(
318 const char *filename,
319 const char *tmplt,
320 int argc,
321 const char **argv)
322 {
323 return _rrd_update(filename, tmplt, argc, argv, NULL);
324 }
326 int _rrd_update(
327 const char *filename,
328 const char *tmplt,
329 int argc,
330 const char **argv,
331 info_t *pcdp_summary)
332 {
334 int arg_i = 2;
336 unsigned long rra_begin; /* byte pointer to the rra
337 * area in the rrd file. this
338 * pointer never changes value */
339 unsigned long rra_current; /* byte pointer to the current write
340 * spot in the rrd file. */
341 rrd_value_t *pdp_new; /* prepare the incoming data to be added
342 * to the existing entry */
343 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
344 * to the cdp values */
346 long *tmpl_idx; /* index representing the settings
347 * transported by the tmplt index */
348 unsigned long tmpl_cnt = 2; /* time and data */
349 rrd_t rrd;
350 time_t current_time = 0;
351 unsigned long current_time_usec = 0; /* microseconds part of current time */
352 char **updvals;
353 int schedule_smooth = 0;
355 /* number of elapsed PDP steps since last update */
356 unsigned long *rra_step_cnt = NULL;
358 int version; /* rrd version */
359 rrd_file_t *rrd_file;
360 char *arg_copy; /* for processing the argv */
361 unsigned long *skip_update; /* RRAs to advance but not write */
363 /* need at least 1 arguments: data. */
364 if (argc < 1) {
365 rrd_set_error("Not enough arguments");
366 goto err_out;
367 }
369 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
370 goto err_free;
371 }
372 /* We are now at the beginning of the rra's */
373 rra_current = rra_begin = rrd_file->header_len;
375 version = atoi(rrd.stat_head->version);
377 initialize_time(¤t_time, ¤t_time_usec, version);
379 /* get exclusive lock to whole file.
380 * lock gets removed when we close the file.
381 */
382 if (LockRRD(rrd_file->fd) != 0) {
383 rrd_set_error("could not lock RRD");
384 goto err_close;
385 }
387 if (allocate_data_structures(&rrd, &updvals,
388 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
389 &rra_step_cnt, &skip_update, &pdp_new) == -1) {
390 goto err_close;
391 }
393 /* loop through the arguments. */
394 for (arg_i = 0; arg_i < argc; arg_i++) {
395 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
396 rrd_set_error("failed duplication argv entry");
397 break;
398 }
399 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
400 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
401 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary,
402 version, skip_update, &schedule_smooth) == -1) {
403 free(arg_copy);
404 break;
405 }
406 free(arg_copy);
407 }
409 free(rra_step_cnt);
411 /* if we got here and if there is an error and if the file has not been
412 * written to, then close things up and return. */
413 if (rrd_test_error()) {
414 goto err_free_structures;
415 }
417 #ifndef HAVE_MMAP
418 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
419 goto err_free_structures;
420 }
421 #endif
423 /* calling the smoothing code here guarantees at most one smoothing
424 * operation per rrd_update call. Unfortunately, it is possible with bulk
425 * updates, or a long-delayed update for smoothing to occur off-schedule.
426 * This really isn't critical except during the burn-in cycles. */
427 if (schedule_smooth) {
428 smooth_all_rras(&rrd, rrd_file, rra_begin);
429 }
431 /* rrd_dontneed(rrd_file,&rrd); */
432 rrd_free(&rrd);
433 rrd_close(rrd_file);
435 free(pdp_new);
436 free(tmpl_idx);
437 free(pdp_temp);
438 free(skip_update);
439 free(updvals);
440 return 0;
442 err_free_structures:
443 free(pdp_new);
444 free(tmpl_idx);
445 free(pdp_temp);
446 free(skip_update);
447 free(updvals);
448 err_close:
449 rrd_close(rrd_file);
450 err_free:
451 rrd_free(&rrd);
452 err_out:
453 return -1;
454 }
456 /*
457 * get exclusive lock to whole file.
458 * lock gets removed when we close the file
459 *
460 * returns 0 on success
461 */
462 int LockRRD(
463 int in_file)
464 {
465 int rcstat;
467 {
468 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
469 struct _stat st;
471 if (_fstat(in_file, &st) == 0) {
472 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
473 } else {
474 rcstat = -1;
475 }
476 #else
477 struct flock lock;
479 lock.l_type = F_WRLCK; /* exclusive write lock */
480 lock.l_len = 0; /* whole file */
481 lock.l_start = 0; /* start of file */
482 lock.l_whence = SEEK_SET; /* end of file */
484 rcstat = fcntl(in_file, F_SETLK, &lock);
485 #endif
486 }
488 return (rcstat);
489 }
491 /*
492 * Allocate some important arrays used, and initialize the template.
493 *
494 * When it returns, either all of the structures are allocated
495 * or none of them are.
496 *
497 * Returns 0 on success, -1 on error.
498 */
499 static int allocate_data_structures(
500 rrd_t *rrd,
501 char ***updvals,
502 rrd_value_t **pdp_temp,
503 const char *tmplt,
504 long **tmpl_idx,
505 unsigned long *tmpl_cnt,
506 unsigned long **rra_step_cnt,
507 unsigned long **skip_update,
508 rrd_value_t **pdp_new)
509 {
510 unsigned i, ii;
511 if ((*updvals = (char **)malloc(sizeof(char *)
512 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
513 rrd_set_error("allocating updvals pointer array.");
514 return -1;
515 }
516 if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
517 * rrd->stat_head->ds_cnt)) == NULL) {
518 rrd_set_error("allocating pdp_temp.");
519 goto err_free_updvals;
520 }
521 if ((*skip_update = (unsigned long *)malloc(sizeof(unsigned long)
522 * rrd->stat_head->rra_cnt)) == NULL) {
523 rrd_set_error("allocating skip_update.");
524 goto err_free_pdp_temp;
525 }
526 if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
527 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
528 rrd_set_error("allocating tmpl_idx.");
529 goto err_free_skip_update;
530 }
531 if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long)
532 * (rrd->stat_head->rra_cnt))) == NULL) {
533 rrd_set_error("allocating rra_step_cnt.");
534 goto err_free_tmpl_idx;
535 }
537 /* initialize tmplt redirector */
538 /* default config example (assume DS 1 is a CDEF DS)
539 tmpl_idx[0] -> 0; (time)
540 tmpl_idx[1] -> 1; (DS 0)
541 tmpl_idx[2] -> 3; (DS 2)
542 tmpl_idx[3] -> 4; (DS 3) */
543 (*tmpl_idx)[0] = 0; /* time */
544 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
545 if (dst_conv(rrd->ds_def[i-1].dst) != DST_CDEF)
546 (*tmpl_idx)[ii++] = i;
547 }
548 *tmpl_cnt = ii;
550 if (tmplt != NULL) {
551 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
552 goto err_free_rra_step_cnt;
553 }
554 }
556 if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
557 * rrd->stat_head->ds_cnt)) == NULL) {
558 rrd_set_error("allocating pdp_new.");
559 goto err_free_rra_step_cnt;
560 }
562 return 0;
564 err_free_rra_step_cnt:
565 free(*rra_step_cnt);
566 err_free_tmpl_idx:
567 free(*tmpl_idx);
568 err_free_skip_update:
569 free(*skip_update);
570 err_free_pdp_temp:
571 free(*pdp_temp);
572 err_free_updvals:
573 free(*updvals);
574 return -1;
575 }
577 /*
578 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
579 *
580 * Returns 0 on success.
581 */
582 static int parse_template(
583 rrd_t *rrd, const char *tmplt,
584 unsigned long *tmpl_cnt, long *tmpl_idx)
585 {
586 char *dsname, *tmplt_copy;
587 unsigned int tmpl_len, i;
588 int ret = 0;
590 *tmpl_cnt = 1; /* the first entry is the time */
592 /* we should work on a writeable copy here */
593 if ((tmplt_copy = strdup(tmplt)) == NULL) {
594 rrd_set_error("error copying tmplt '%s'", tmplt);
595 ret = -1;
596 goto out;
597 }
599 dsname = tmplt_copy;
600 tmpl_len = strlen(tmplt_copy);
601 for (i = 0; i <= tmpl_len; i++) {
602 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
603 tmplt_copy[i] = '\0';
604 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
605 rrd_set_error("tmplt contains more DS definitions than RRD");
606 ret = -1;
607 goto out_free_tmpl_copy;
608 }
609 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname)+1) == 0) {
610 rrd_set_error("unknown DS name '%s'", dsname);
611 ret = -1;
612 goto out_free_tmpl_copy;
613 }
614 /* go to the next entry on the tmplt_copy */
615 if (i < tmpl_len)
616 dsname = &tmplt_copy[i+1];
617 }
618 }
619 out_free_tmpl_copy:
620 free(tmplt_copy);
621 out:
622 return ret;
623 }
625 /*
626 * Parse an update string, updates the primary data points (PDPs)
627 * and consolidated data points (CDPs), and writes changes to the RRAs.
628 *
629 * Returns 0 on success, -1 on error.
630 */
631 static int process_arg(
632 char *step_start,
633 rrd_t *rrd,
634 rrd_file_t *rrd_file,
635 unsigned long rra_begin,
636 unsigned long *rra_current,
637 time_t *current_time,
638 unsigned long *current_time_usec,
639 rrd_value_t *pdp_temp,
640 rrd_value_t *pdp_new,
641 unsigned long *rra_step_cnt,
642 char **updvals,
643 long *tmpl_idx,
644 unsigned long tmpl_cnt,
645 info_t **pcdp_summary,
646 int version,
647 unsigned long *skip_update,
648 int *schedule_smooth)
649 {
650 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
652 /* a vector of future Holt-Winters seasonal coefs */
653 unsigned long elapsed_pdp_st;
655 double interval, pre_int, post_int; /* interval between this and
656 * the last run */
657 unsigned long proc_pdp_cnt;
658 unsigned long rra_start;
660 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
661 current_time, current_time_usec, version) == -1) {
662 return -1;
663 }
664 /* seek to the beginning of the rra's */
665 if (*rra_current != rra_begin) {
666 #ifndef HAVE_MMAP
667 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
668 rrd_set_error("seek error in rrd");
669 return -1;
670 }
671 #endif
672 *rra_current = rra_begin;
673 }
674 rra_start = rra_begin;
676 interval = (double) (*current_time - rrd->live_head->last_up)
677 + (double) ((long) *current_time_usec -
678 (long) rrd->live_head->last_up_usec) / 1e6f;
680 /* process the data sources and update the pdp_prep
681 * area accordingly */
682 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
683 return -1;
684 }
686 elapsed_pdp_st = calculate_elapsed_steps(rrd,
687 *current_time, *current_time_usec,
688 interval, &pre_int, &post_int,
689 &proc_pdp_cnt);
691 /* has a pdp_st moment occurred since the last run ? */
692 if (elapsed_pdp_st == 0) {
693 /* no we have not passed a pdp_st moment. therefore update is simple */
694 simple_update(rrd, interval, pdp_new);
695 } else {
696 /* an pdp_st has occurred. */
697 if (process_all_pdp_st(rrd, interval,
698 pre_int, post_int,
699 elapsed_pdp_st,
700 pdp_new, pdp_temp) == -1)
701 {
702 return -1;
703 }
704 if (update_all_cdp_prep(rrd, rra_step_cnt,
705 rra_begin, rrd_file,
706 elapsed_pdp_st,
707 proc_pdp_cnt,
708 &last_seasonal_coef,
709 &seasonal_coef,
710 pdp_temp, rra_current,
711 skip_update, schedule_smooth) == -1)
712 {
713 goto err_free_coefficients;
714 }
715 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
716 elapsed_pdp_st, pdp_temp, &seasonal_coef) == -1)
717 {
718 goto err_free_coefficients;
719 }
720 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
721 rra_current, *current_time, skip_update, pcdp_summary) == -1)
722 {
723 goto err_free_coefficients;
724 }
725 } /* endif a pdp_st has occurred */
726 rrd->live_head->last_up = *current_time;
727 rrd->live_head->last_up_usec = *current_time_usec;
729 free(seasonal_coef);
730 free(last_seasonal_coef);
731 return 0;
733 err_free_coefficients:
734 free(seasonal_coef);
735 free(last_seasonal_coef);
736 return -1;
737 }
739 /*
740 * Parse a DS string (time + colon-separated values), storing the
741 * results in current_time, current_time_usec, and updvals.
742 *
743 * Returns 0 on success, -1 on error.
744 */
745 static int parse_ds(
746 rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
747 unsigned long tmpl_cnt, time_t *current_time,
748 unsigned long *current_time_usec, int version)
749 {
750 char *p;
751 unsigned long i;
752 char timesyntax;
754 updvals[0] = input;
755 /* initialize all ds input to unknown except the first one
756 which has always got to be set */
757 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
758 updvals[i] = "U";
760 /* separate all ds elements; first must be examined separately
761 due to alternate time syntax */
762 if ((p = strchr(input, '@')) != NULL) {
763 timesyntax = '@';
764 } else if ((p = strchr(input, ':')) != NULL) {
765 timesyntax = ':';
766 } else {
767 rrd_set_error("expected timestamp not found in data source from %s",
768 input);
769 return -1;
770 }
771 *p = '\0';
772 i = 1;
773 updvals[tmpl_idx[i++]] = p+1;
774 while (*(++p)) {
775 if (*p == ':') {
776 *p = '\0';
777 if (i < tmpl_cnt) {
778 updvals[tmpl_idx[i++]] = p+1;
779 }
780 }
781 }
783 if (i != tmpl_cnt) {
784 rrd_set_error("expected %lu data source readings (got %lu) from %s",
785 tmpl_cnt - 1, i, input);
786 return -1;
787 }
789 if (get_time_from_reading(rrd, timesyntax, updvals,
790 current_time, current_time_usec,
791 version) == -1) {
792 return -1;
793 }
794 return 0;
795 }
797 /*
798 * Parse the time in a DS string, store it in current_time and
799 * current_time_usec and verify that it's later than the last
800 * update for this DS.
801 *
802 * Returns 0 on success, -1 on error.
803 */
804 static int get_time_from_reading(
805 rrd_t *rrd, char timesyntax, char **updvals,
806 time_t *current_time, unsigned long *current_time_usec,
807 int version)
808 {
809 double tmp;
810 char *parsetime_error = NULL;
811 char *old_locale;
812 struct rrd_time_value ds_tv;
813 struct timeval tmp_time; /* used for time conversion */
815 /* get the time from the reading ... handle N */
816 if (timesyntax == '@') { /* at-style */
817 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
818 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
819 return -1;
820 }
821 if (ds_tv.type == RELATIVE_TO_END_TIME ||
822 ds_tv.type == RELATIVE_TO_START_TIME) {
823 rrd_set_error("specifying time relative to the 'start' "
824 "or 'end' makes no sense here: %s", updvals[0]);
825 return -1;
826 }
827 *current_time = mktime(&ds_tv.tm) + ds_tv.offset;
828 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
829 } else if (strcmp(updvals[0], "N") == 0) {
830 gettimeofday(&tmp_time, 0);
831 normalize_time(&tmp_time);
832 *current_time = tmp_time.tv_sec;
833 *current_time_usec = tmp_time.tv_usec;
834 } else {
835 old_locale = setlocale(LC_NUMERIC, "C");
836 tmp = strtod(updvals[0], 0);
837 setlocale(LC_NUMERIC, old_locale);
838 *current_time = floor(tmp);
839 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
840 }
841 /* dont do any correction for old version RRDs */
842 if (version < 3)
843 *current_time_usec = 0;
845 if (*current_time < rrd->live_head->last_up ||
846 (*current_time == rrd->live_head->last_up &&
847 (long) *current_time_usec <=
848 (long) rrd->live_head->last_up_usec)) {
849 rrd_set_error("illegal attempt to update using time %ld when "
850 "last update time is %ld (minimum one second step)",
851 *current_time, rrd->live_head->last_up);
852 return -1;
853 }
854 return 0;
855 }
857 /*
858 * Update pdp_new by interpreting the updvals according to the DS type
859 * (COUNTER, GAUGE, etc.).
860 *
861 * Returns 0 on success, -1 on error.
862 */
863 static int update_pdp_prep(
864 rrd_t *rrd, char **updvals,
865 rrd_value_t *pdp_new, double interval)
866 {
867 unsigned long ds_idx;
868 int ii;
869 char *endptr; /* used in the conversion */
870 double rate;
871 char *old_locale;
872 enum dst_en dst_idx;
874 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
875 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
877 /* make sure we do not build diffs with old last_ds values */
878 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
879 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
880 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
881 }
883 /* NOTE: DST_CDEF should never enter this if block, because
884 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
885 * accidently specified a value for the DST_CDEF. To handle this case,
886 * an extra check is required. */
888 if ((updvals[ds_idx+1][0] != 'U') &&
889 (dst_idx != DST_CDEF) &&
890 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
891 rate = DNAN;
893 /* pdp_new contains rate * time ... eg the bytes transferred during
894 * the interval. Doing it this way saves a lot of math operations
895 */
896 switch (dst_idx) {
897 case DST_COUNTER:
898 case DST_DERIVE:
899 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
900 if ((updvals[ds_idx + 1][ii] < '0' || updvals[ds_idx + 1][ii] > '9')
901 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
902 rrd_set_error("not a simple integer: '%s'", updvals[ds_idx + 1]);
903 return -1;
904 }
905 }
906 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
907 pdp_new[ds_idx] = rrd_diff(updvals[ds_idx+1], rrd->pdp_prep[ds_idx].last_ds);
908 if (dst_idx == DST_COUNTER) {
909 /* simple overflow catcher. This will fail
910 * terribly for non 32 or 64 bit counters
911 * ... are there any others in SNMP land?
912 */
913 if (pdp_new[ds_idx] < (double) 0.0)
914 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
915 if (pdp_new[ds_idx] < (double) 0.0)
916 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
917 }
918 rate = pdp_new[ds_idx] / interval;
919 } else {
920 pdp_new[ds_idx] = DNAN;
921 }
922 break;
923 case DST_ABSOLUTE:
924 old_locale = setlocale(LC_NUMERIC, "C");
925 errno = 0;
926 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
927 setlocale(LC_NUMERIC, old_locale);
928 if (errno > 0) {
929 rrd_set_error("converting '%s' to float: %s",
930 updvals[ds_idx + 1], rrd_strerror(errno));
931 return -1;
932 };
933 if (endptr[0] != '\0') {
934 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
935 updvals[ds_idx + 1], endptr);
936 return -1;
937 }
938 rate = pdp_new[ds_idx] / interval;
939 break;
940 case DST_GAUGE:
941 errno = 0;
942 old_locale = setlocale(LC_NUMERIC, "C");
943 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr) * interval;
944 setlocale(LC_NUMERIC, old_locale);
945 if (errno) {
946 rrd_set_error("converting '%s' to float: %s",
947 updvals[ds_idx + 1], rrd_strerror(errno));
948 return -1;
949 };
950 if (endptr[0] != '\0') {
951 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
952 updvals[ds_idx + 1], endptr);
953 return -1;
954 }
955 rate = pdp_new[ds_idx] / interval;
956 break;
957 default:
958 rrd_set_error("rrd contains unknown DS type : '%s'",
959 rrd->ds_def[ds_idx].dst);
960 return -1;
961 }
962 /* break out of this for loop if the error string is set */
963 if (rrd_test_error()) {
964 return -1;
965 }
966 /* make sure pdp_temp is neither too large or too small
967 * if any of these occur it becomes unknown ...
968 * sorry folks ... */
969 if (!isnan(rate) &&
970 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
971 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
972 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
973 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
974 pdp_new[ds_idx] = DNAN;
975 }
976 } else {
977 /* no news is news all the same */
978 pdp_new[ds_idx] = DNAN;
979 }
982 /* make a copy of the command line argument for the next run */
983 #ifdef DEBUG
984 fprintf(stderr, "prep ds[%lu]\t"
985 "last_arg '%s'\t"
986 "this_arg '%s'\t"
987 "pdp_new %10.2f\n",
988 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], pdp_new[ds_idx]);
989 #endif
990 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], LAST_DS_LEN - 1);
991 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN-1] = '\0';
992 }
993 return 0;
994 }
996 /*
997 * How many PDP steps have elapsed since the last update? Returns the answer,
998 * and stores the time between the last update and the last PDP in pre_time,
999 * and the time between the last PDP and the current time in post_int.
1000 */
1001 static int calculate_elapsed_steps(
1002 rrd_t *rrd,
1003 unsigned long current_time,
1004 unsigned long current_time_usec,
1005 double interval,
1006 double *pre_int,
1007 double *post_int,
1008 unsigned long *proc_pdp_cnt)
1009 {
1010 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1011 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1012 * time */
1013 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1014 * when it was last updated */
1015 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1017 /* when was the current pdp started */
1018 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1019 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1021 /* when did the last pdp_st occur */
1022 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1023 occu_pdp_st = current_time - occu_pdp_age;
1025 if (occu_pdp_st > proc_pdp_st) {
1026 /* OK we passed the pdp_st moment */
1027 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1028 * occurred before the latest
1029 * pdp_st moment*/
1030 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1031 *post_int = occu_pdp_age; /* how much after it */
1032 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1033 } else {
1034 *pre_int = interval;
1035 *post_int = 0;
1036 }
1038 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1040 #ifdef DEBUG
1041 printf("proc_pdp_age %lu\t"
1042 "proc_pdp_st %lu\t"
1043 "occu_pfp_age %lu\t"
1044 "occu_pdp_st %lu\t"
1045 "int %lf\t"
1046 "pre_int %lf\t"
1047 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1048 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1049 #endif
1051 /* compute the number of elapsed pdp_st moments */
1052 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1053 }
1055 /*
1056 * Increment the PDP values by the values in pdp_new, or else initialize them.
1057 */
1058 static void simple_update(
1059 rrd_t *rrd, double interval, rrd_value_t *pdp_new)
1060 {
1061 int i;
1062 for (i = 0; i < (signed)rrd->stat_head->ds_cnt; i++) {
1063 if (isnan(pdp_new[i])) {
1064 /* this is not really accurate if we use subsecond data arrival time
1065 should have thought of it when going subsecond resolution ...
1066 sorry next format change we will have it! */
1067 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
1068 } else {
1069 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1070 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1071 } else {
1072 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1073 }
1074 }
1075 #ifdef DEBUG
1076 fprintf(stderr,
1077 "NO PDP ds[%i]\t"
1078 "value %10.2f\t"
1079 "unkn_sec %5lu\n",
1080 i,
1081 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1082 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1083 #endif
1084 }
1085 }
1087 /*
1088 * Call process_pdp_st for each DS.
1089 *
1090 * Returns 0 on success, -1 on error.
1091 */
1092 static int process_all_pdp_st(
1093 rrd_t *rrd, double interval, double pre_int, double post_int,
1094 unsigned long elapsed_pdp_st, rrd_value_t *pdp_new, rrd_value_t *pdp_temp)
1095 {
1096 unsigned long ds_idx;
1097 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1098 rate*seconds which occurred up to the last run.
1099 pdp_new[] contains rate*seconds from the latest run.
1100 pdp_temp[] will contain the rate for cdp */
1102 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1103 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1104 elapsed_pdp_st * rrd->stat_head->pdp_step,
1105 pdp_new, pdp_temp) == -1) {
1106 return -1;
1107 }
1108 #ifdef DEBUG
1109 fprintf(stderr, "PDP UPD ds[%lu]\t"
1110 "pdp_temp %10.2f\t"
1111 "new_prep %10.2f\t"
1112 "new_unkn_sec %5lu\n",
1113 ds_idx, pdp_temp[ds_idx],
1114 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1115 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1116 #endif
1117 }
1118 return 0;
1119 }
1121 /*
1122 * Process an update that occurs after one of the PDP moments.
1123 * Increments the PDP value, sets NAN if time greater than the
1124 * heartbeats have elapsed, processes CDEFs.
1125 *
1126 * Returns 0 on success, -1 on error.
1127 */
1128 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval,
1129 double pre_int, double post_int, long diff_pdp_st,
1130 rrd_value_t *pdp_new, rrd_value_t *pdp_temp)
1131 {
1132 int i;
1133 /* update pdp_prep to the current pdp_st. */
1134 double pre_unknown = 0.0;
1135 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1136 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1138 rpnstack_t rpnstack; /* used for COMPUTE DS */
1139 rpnstack_init(&rpnstack);
1142 if (isnan(pdp_new[ds_idx])) {
1143 /* a final bit of unknown to be added bevore calculation
1144 we use a temporary variable for this so that we
1145 don't have to turn integer lines before using the value */
1146 pre_unknown = pre_int;
1147 } else {
1148 if (isnan(scratch[PDP_val].u_val)) {
1149 scratch[PDP_val].u_val = 0;
1150 }
1151 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1152 }
1154 /* if too much of the pdp_prep is unknown we dump it */
1155 /* if the interval is larger thatn mrhb we get NAN */
1156 if ((interval > mrhb) ||
1157 (diff_pdp_st <= (signed)scratch[PDP_unkn_sec_cnt].u_cnt)) {
1158 pdp_temp[ds_idx] = DNAN;
1159 } else {
1160 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1161 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) - pre_unknown);
1162 }
1164 /* process CDEF data sources; remember each CDEF DS can
1165 * only reference other DS with a lower index number */
1166 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1167 rpnp_t *rpnp;
1169 rpnp = rpn_expand((rpn_cdefds_t *)&(rrd->ds_def[ds_idx].par[DS_cdef]));
1170 /* substitute data values for OP_VARIABLE nodes */
1171 for (i = 0; rpnp[i].op != OP_END; i++) {
1172 if (rpnp[i].op == OP_VARIABLE) {
1173 rpnp[i].op = OP_NUMBER;
1174 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1175 }
1176 }
1177 /* run the rpn calculator */
1178 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1179 free(rpnp);
1180 rpnstack_free(&rpnstack);
1181 return -1;
1182 }
1183 }
1185 /* make pdp_prep ready for the next run */
1186 if (isnan(pdp_new[ds_idx])) {
1187 /* this is not realy accurate if we use subsecond data arival time
1188 should have thought of it when going subsecond resolution ...
1189 sorry next format change we will have it! */
1190 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1191 scratch[PDP_val].u_val = DNAN;
1192 } else {
1193 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1194 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1195 }
1196 rpnstack_free(&rpnstack);
1197 return 0;
1198 }
1200 /*
1201 * Iterate over all the RRAs for a given DS and:
1202 * 1. Decide whether to schedule a smooth later
1203 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1204 * 3. Update the CDP
1205 *
1206 * Returns 0 on success, -1 on error
1207 */
1208 static int update_all_cdp_prep(
1209 rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin,
1210 rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
1211 rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
1212 rrd_value_t *pdp_temp, unsigned long *rra_current,
1213 unsigned long *skip_update, int *schedule_smooth)
1214 {
1215 unsigned long rra_idx;
1216 /* index into the CDP scratch array */
1217 enum cf_en current_cf;
1218 unsigned long rra_start;
1219 /* number of rows to be updated in an RRA for a data value. */
1220 unsigned long start_pdp_offset;
1222 rra_start = rra_begin;
1223 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1224 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1225 start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1226 skip_update[rra_idx] = 0;
1227 if (start_pdp_offset <= elapsed_pdp_st) {
1228 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1229 rrd->rra_def[rra_idx].pdp_cnt + 1;
1230 } else {
1231 rra_step_cnt[rra_idx] = 0;
1232 }
1234 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1235 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1236 * so that they will be correct for the next observed value; note that for
1237 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1238 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1239 if (rra_step_cnt[rra_idx] > 1) {
1240 skip_update[rra_idx] = 1;
1241 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1242 elapsed_pdp_st, last_seasonal_coef);
1243 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1244 elapsed_pdp_st + 1, seasonal_coef);
1245 }
1246 /* periodically run a smoother for seasonal effects */
1247 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1248 #ifdef DEBUG
1249 fprintf(stderr, "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1250 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1251 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt);
1252 #endif
1253 *schedule_smooth = 1;
1254 }
1255 *rra_current = rrd_tell(rrd_file);
1256 }
1257 if (rrd_test_error())
1258 return -1;
1260 if (update_cdp_prep(rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt,
1261 rra_idx, pdp_temp, *last_seasonal_coef, *seasonal_coef,
1262 current_cf) == -1) {
1263 return -1;
1264 }
1265 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1266 }
1267 return 0;
1268 }
1270 /*
1271 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1272 */
1273 static int do_schedule_smooth(
1274 rrd_t *rrd, unsigned long rra_idx,
1275 unsigned long elapsed_pdp_st)
1276 {
1277 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1278 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1279 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1280 unsigned long seasonal_smooth_idx = rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1281 unsigned long *init_seasonal = &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1283 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1284 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1285 * really an RRA level, not a data source within RRA level parameter, but
1286 * the rra_def is read only for rrd_update (not flushed to disk). */
1287 if (*init_seasonal > BURNIN_CYCLES) {
1288 /* someone has no doubt invented a trick to deal with this wrap around,
1289 * but at least this code is clear. */
1290 if (seasonal_smooth_idx > cur_row) {
1291 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1292 * between PDP and CDP */
1293 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1294 }
1295 /* can't rely on negative numbers because we are working with
1296 * unsigned values */
1297 return (cur_row + elapsed_pdp_st >= row_cnt
1298 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1299 }
1300 /* mark off one of the burn-in cycles */
1301 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1302 }
1304 /*
1305 * For a given RRA, iterate over the data sources and call the appropriate
1306 * consolidation function.
1307 *
1308 * Returns 0 on success, -1 on error.
1309 */
1310 static int update_cdp_prep(
1311 rrd_t *rrd,
1312 unsigned long elapsed_pdp_st,
1313 unsigned long start_pdp_offset,
1314 unsigned long *rra_step_cnt,
1315 int rra_idx,
1316 rrd_value_t *pdp_temp,
1317 rrd_value_t *last_seasonal_coef,
1318 rrd_value_t *seasonal_coef,
1319 int current_cf)
1320 {
1321 unsigned long ds_idx, cdp_idx;
1322 /* update CDP_PREP areas */
1323 /* loop over data soures within each RRA */
1324 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1326 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1328 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1329 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1330 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1331 elapsed_pdp_st, start_pdp_offset,
1332 rrd->rra_def[rra_idx].pdp_cnt,
1333 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val, rra_idx, ds_idx);
1334 } else {
1335 /* Nothing to consolidate if there's one PDP per CDP. However, if
1336 * we've missed some PDPs, let's update null counters etc. */
1337 if (elapsed_pdp_st > 2) {
1338 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef, seasonal_coef,
1339 rra_idx, ds_idx, cdp_idx, current_cf);
1340 }
1341 }
1343 if (rrd_test_error())
1344 return -1;
1345 } /* endif data sources loop */
1346 return 0;
1347 }
1349 /*
1350 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1351 * primary value, secondary value, and # of unknowns.
1352 */
1353 static void update_cdp(
1354 unival *scratch,
1355 int current_cf,
1356 rrd_value_t pdp_temp_val,
1357 unsigned long rra_step_cnt,
1358 unsigned long elapsed_pdp_st,
1359 unsigned long start_pdp_offset,
1360 unsigned long pdp_cnt,
1361 rrd_value_t xff,
1362 int i, int ii)
1363 {
1364 /* shorthand variables */
1365 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1366 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1367 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1368 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1370 if (rra_step_cnt) {
1371 /* If we are in this block, as least 1 CDP value will be written to
1372 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1373 * to be written, then the "fill in" value is the CDP_secondary_val
1374 * entry. */
1375 if (isnan(pdp_temp_val)) {
1376 *cdp_unkn_pdp_cnt += start_pdp_offset;
1377 *cdp_secondary_val = DNAN;
1378 } else {
1379 /* CDP_secondary value is the RRA "fill in" value for intermediary
1380 * CDP data entries. No matter the CF, the value is the same because
1381 * the average, max, min, and last of a list of identical values is
1382 * the same, namely, the value itself. */
1383 *cdp_secondary_val = pdp_temp_val;
1384 }
1386 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1387 *cdp_primary_val = DNAN;
1388 if (current_cf == CF_AVERAGE) {
1389 *cdp_val = initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1390 start_pdp_offset, pdp_cnt);
1391 } else {
1392 *cdp_val = pdp_temp_val;
1393 }
1394 } else {
1395 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1396 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1397 } /* endif meets xff value requirement for a valid value */
1398 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1399 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1400 if (isnan(pdp_temp_val))
1401 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1402 else
1403 *cdp_unkn_pdp_cnt = 0;
1404 } else { /* rra_step_cnt[i] == 0 */
1406 #ifdef DEBUG
1407 if (isnan(*cdp_val)) {
1408 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1409 i, ii);
1410 } else {
1411 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1412 i, ii, *cdp_val);
1413 }
1414 #endif
1415 if (isnan(pdp_temp_val)) {
1416 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1417 } else {
1418 *cdp_val = calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st, current_cf, i, ii);
1419 }
1420 }
1421 }
1423 /*
1424 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1425 * on the type of consolidation function.
1426 */
1427 static void initialize_cdp_val(
1428 unival *scratch,
1429 int current_cf,
1430 rrd_value_t pdp_temp_val,
1431 unsigned long elapsed_pdp_st,
1432 unsigned long start_pdp_offset,
1433 unsigned long pdp_cnt)
1434 {
1435 rrd_value_t cum_val, cur_val;
1437 switch (current_cf) {
1438 case CF_AVERAGE:
1439 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1440 cur_val = IFDNAN(pdp_temp_val, 0.0);
1441 scratch[CDP_primary_val].u_val =
1442 (cum_val + cur_val * start_pdp_offset) /
1443 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1444 scratch[CDP_val].u_val = initialize_average_carry_over(
1445 pdp_temp_val, elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1446 break;
1447 case CF_MAXIMUM:
1448 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1449 cur_val = IFDNAN(pdp_temp_val, -DINF);
1450 #if 0
1451 #ifdef DEBUG
1452 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1453 fprintf(stderr,
1454 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1455 i, ii);
1456 exit(-1);
1457 }
1458 #endif
1459 #endif
1460 if (cur_val > cum_val)
1461 scratch[CDP_primary_val].u_val = cur_val;
1462 else
1463 scratch[CDP_primary_val].u_val = cum_val;
1464 /* initialize carry over value */
1465 scratch[CDP_val].u_val = pdp_temp_val;
1466 break;
1467 case CF_MINIMUM:
1468 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1469 cur_val = IFDNAN(pdp_temp_val, DINF);
1470 #if 0
1471 #ifdef DEBUG
1472 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1473 fprintf(stderr, "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1474 i, ii);
1475 exit(-1);
1476 }
1477 #endif
1478 #endif
1479 if (cur_val < cum_val)
1480 scratch[CDP_primary_val].u_val = cur_val;
1481 else
1482 scratch[CDP_primary_val].u_val = cum_val;
1483 /* initialize carry over value */
1484 scratch[CDP_val].u_val = pdp_temp_val;
1485 break;
1486 case CF_LAST:
1487 default:
1488 scratch[CDP_primary_val].u_val = pdp_temp_val;
1489 /* initialize carry over value */
1490 scratch[CDP_val].u_val = pdp_temp_val;
1491 break;
1492 }
1493 }
1495 /*
1496 * Update the consolidation function for Holt-Winters functions as
1497 * well as other functions that don't actually consolidate multiple
1498 * PDPs.
1499 */
1500 static void reset_cdp(
1501 rrd_t *rrd,
1502 unsigned long elapsed_pdp_st,
1503 rrd_value_t *pdp_temp,
1504 rrd_value_t *last_seasonal_coef,
1505 rrd_value_t *seasonal_coef,
1506 int rra_idx, int ds_idx, int cdp_idx,
1507 enum cf_en current_cf)
1508 {
1509 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1511 switch (current_cf) {
1512 case CF_AVERAGE:
1513 default:
1514 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1515 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1516 break;
1517 case CF_SEASONAL:
1518 case CF_DEVSEASONAL:
1519 /* need to update cached seasonal values, so they are consistent
1520 * with the bulk update */
1521 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1522 * CDP_last_deviation are the same. */
1523 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1524 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1525 break;
1526 case CF_HWPREDICT:
1527 case CF_MHWPREDICT:
1528 /* need to update the null_count and last_null_count.
1529 * even do this for non-DNAN pdp_temp because the
1530 * algorithm is not learning from batch updates. */
1531 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1532 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1533 /* fall through */
1534 case CF_DEVPREDICT:
1535 scratch[CDP_primary_val].u_val = DNAN;
1536 scratch[CDP_secondary_val].u_val = DNAN;
1537 break;
1538 case CF_FAILURES:
1539 /* do not count missed bulk values as failures */
1540 scratch[CDP_primary_val].u_val = 0;
1541 scratch[CDP_secondary_val].u_val = 0;
1542 /* need to reset violations buffer.
1543 * could do this more carefully, but for now, just
1544 * assume a bulk update wipes away all violations. */
1545 erase_violations(rrd, cdp_idx, rra_idx);
1546 break;
1547 }
1548 }
1550 static rrd_value_t initialize_average_carry_over(
1551 rrd_value_t pdp_temp_val,
1552 unsigned long elapsed_pdp_st,
1553 unsigned long start_pdp_offset,
1554 unsigned long pdp_cnt)
1555 {
1556 /* initialize carry over value */
1557 if (isnan(pdp_temp_val)) {
1558 return DNAN;
1559 }
1560 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1561 }
1563 /*
1564 * Update or initialize a CDP value based on the consolidation
1565 * function.
1566 *
1567 * Returns the new value.
1568 */
1569 static rrd_value_t calculate_cdp_val(
1570 rrd_value_t cdp_val,
1571 rrd_value_t pdp_temp_val,
1572 unsigned long elapsed_pdp_st,
1573 int current_cf, int i, int ii)
1574 {
1575 if (isnan(cdp_val)) {
1576 if (current_cf == CF_AVERAGE) {
1577 pdp_temp_val *= elapsed_pdp_st;
1578 }
1579 #ifdef DEBUG
1580 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1581 i, ii, pdp_temp_val);
1582 #endif
1583 return pdp_temp_val;
1584 }
1585 if (current_cf == CF_AVERAGE)
1586 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1587 if (current_cf == CF_MINIMUM)
1588 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1589 if (current_cf == CF_MAXIMUM)
1590 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1592 return pdp_temp_val;
1593 }
1595 /*
1596 * For each RRA, update the seasonal values and then call update_aberrant_CF
1597 * for each data source.
1598 *
1599 * Return 0 on success, -1 on error.
1600 */
1601 static int update_aberrant_cdps(
1602 rrd_t *rrd, rrd_file_t *rrd_file, unsigned long rra_begin,
1603 unsigned long *rra_current, unsigned long elapsed_pdp_st,
1604 rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef)
1605 {
1606 unsigned long rra_idx, ds_idx, j;
1608 /* number of PDP steps since the last update that
1609 * are assigned to the first CDP to be generated
1610 * since the last update. */
1611 unsigned short scratch_idx;
1612 unsigned long rra_start;
1613 enum cf_en current_cf;
1615 /* this loop is only entered if elapsed_pdp_st < 3 */
1616 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1617 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1618 rra_start = rra_begin;
1619 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1620 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1621 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1622 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1623 if (scratch_idx == CDP_primary_val) {
1624 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1625 elapsed_pdp_st + 1, seasonal_coef);
1626 } else {
1627 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1628 elapsed_pdp_st + 2, seasonal_coef);
1629 }
1630 *rra_current = rrd_tell(rrd_file);
1631 }
1632 if (rrd_test_error())
1633 return -1;
1634 /* loop over data soures within each RRA */
1635 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1636 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1637 rra_idx * (rrd->stat_head->ds_cnt) + ds_idx,
1638 rra_idx, ds_idx, scratch_idx, *seasonal_coef);
1639 }
1640 }
1641 rra_start += rrd->rra_def[rra_idx].row_cnt
1642 * rrd->stat_head->ds_cnt
1643 * sizeof(rrd_value_t);
1644 }
1645 }
1646 return 0;
1647 }
1649 /*
1650 * Move sequentially through the file, writing one RRA at a time. Note this
1651 * architecture divorces the computation of CDP with flushing updated RRA
1652 * entries to disk.
1653 *
1654 * Return 0 on success, -1 on error.
1655 */
1656 static int write_to_rras(
1657 rrd_t *rrd,
1658 rrd_file_t *rrd_file,
1659 unsigned long *rra_step_cnt,
1660 unsigned long rra_begin,
1661 unsigned long *rra_current,
1662 time_t current_time,
1663 unsigned long *skip_update,
1664 info_t **pcdp_summary)
1665 {
1666 unsigned long rra_idx;
1667 unsigned long rra_start;
1668 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1669 time_t rra_time = 0; /* time of update for a RRA */
1671 /* Ready to write to disk */
1672 rra_start = rra_begin;
1673 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1674 /* skip unless there's something to write */
1675 if (rra_step_cnt[rra_idx]) {
1676 /* write the first row */
1677 #ifdef DEBUG
1678 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1679 #endif
1680 rrd->rra_ptr[rra_idx].cur_row++;
1681 if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
1682 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1683 /* position on the first row */
1684 rra_pos_tmp = rra_start +
1685 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1686 sizeof(rrd_value_t);
1687 if (rra_pos_tmp != *rra_current) {
1688 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1689 rrd_set_error("seek error in rrd");
1690 return -1;
1691 }
1692 *rra_current = rra_pos_tmp;
1693 }
1694 #ifdef DEBUG
1695 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1696 #endif
1697 if (!skip_update[rra_idx]) {
1698 if (*pcdp_summary != NULL) {
1699 rra_time = (current_time - current_time
1700 % (rrd->rra_def[rra_idx].pdp_cnt *
1701 rrd->stat_head->pdp_step))
1702 - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
1703 rrd->stat_head->pdp_step);
1704 }
1705 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1706 pcdp_summary, rra_time) == -1)
1707 return -1;
1708 }
1710 /* write other rows of the bulk update, if any */
1711 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1712 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
1713 #ifdef DEBUG
1714 fprintf(stderr,
1715 "Wraparound for RRA %s, %lu updates left\n",
1716 rrd->rra_def[rra_idx].cf_nam, rra_step_cnt[rra_idx] - 1);
1717 #endif
1718 /* wrap */
1719 rrd->rra_ptr[rra_idx].cur_row = 0;
1720 /* seek back to beginning of current rra */
1721 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1722 rrd_set_error("seek error in rrd");
1723 return -1;
1724 }
1725 #ifdef DEBUG
1726 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1727 rrd_file->pos);
1728 #endif
1729 *rra_current = rra_start;
1730 }
1731 if (!skip_update[rra_idx]) {
1732 if (*pcdp_summary != NULL) {
1733 rra_time = (current_time - current_time
1734 % (rrd->rra_def[rra_idx].pdp_cnt *
1735 rrd->stat_head->pdp_step))
1736 -
1737 ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
1738 rrd->stat_head->pdp_step);
1739 }
1740 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1741 CDP_secondary_val, pcdp_summary, rra_time) == -1)
1742 return -1;
1743 }
1744 }
1745 }
1746 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1747 sizeof(rrd_value_t);
1748 } /* RRA LOOP */
1750 return 0;
1751 }
1753 /*
1754 * Write out one row of values (one value per DS) to the archive.
1755 *
1756 * Returns 0 on success, -1 on error.
1757 */
1758 static int write_RRA_row(
1759 rrd_file_t *rrd_file,
1760 rrd_t *rrd,
1761 unsigned long rra_idx,
1762 unsigned long *rra_current,
1763 unsigned short CDP_scratch_idx,
1764 info_t **pcdp_summary,
1765 time_t rra_time)
1766 {
1767 unsigned long ds_idx, cdp_idx;
1768 infoval iv;
1770 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1771 /* compute the cdp index */
1772 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1773 #ifdef DEBUG
1774 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1775 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1776 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1777 #endif
1778 if (pcdp_summary != NULL) {
1779 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1780 /* append info to the return hash */
1781 *pcdp_summary = info_push(*pcdp_summary,
1782 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1783 rrd->rra_def[rra_idx].cf_nam,
1784 rrd->rra_def[rra_idx].pdp_cnt,
1785 rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);
1786 }
1787 if (rrd_write(rrd_file,
1788 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1789 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1790 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1791 return -1;
1792 }
1793 *rra_current += sizeof(rrd_value_t);
1794 }
1795 return 0;
1796 }
1798 /*
1799 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1800 *
1801 * Returns 0 on success, -1 otherwise
1802 */
1803 static int smooth_all_rras(
1804 rrd_t *rrd,
1805 rrd_file_t *rrd_file,
1806 unsigned long rra_begin)
1807 {
1808 unsigned long rra_start = rra_begin;
1809 unsigned long rra_idx;
1810 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1811 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1812 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1813 #ifdef DEBUG
1814 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1815 #endif
1816 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1817 if (rrd_test_error())
1818 return -1;
1819 }
1820 rra_start += rrd->rra_def[rra_idx].row_cnt
1821 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1822 }
1823 return 0;
1824 }
1826 #ifndef HAVE_MMAP
1827 /*
1828 * Flush changes to disk (unless we're using mmap)
1829 *
1830 * Returns 0 on success, -1 otherwise
1831 */
1832 static int write_changes_to_disk(
1833 rrd_t *rrd, rrd_file_t *rrd_file, int version)
1834 {
1835 /* we just need to write back the live header portion now*/
1836 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1837 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
1838 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
1839 SEEK_SET) != 0) {
1840 rrd_set_error("seek rrd for live header writeback");
1841 return -1;
1842 }
1843 if (version >= 3) {
1844 if (rrd_write(rrd_file, rrd->live_head,
1845 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1846 rrd_set_error("rrd_write live_head to rrd");
1847 return -1;
1848 }
1849 } else {
1850 if (rrd_write(rrd_file, &rrd->live_head->last_up,
1851 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1852 rrd_set_error("rrd_write live_head to rrd");
1853 return -1;
1854 }
1855 }
1858 if (rrd_write(rrd_file, rrd->pdp_prep,
1859 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
1860 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
1861 rrd_set_error("rrd_write pdp_prep to rrd");
1862 return -1;
1863 }
1865 if (rrd_write(rrd_file, rrd->cdp_prep,
1866 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1867 rrd->stat_head->ds_cnt)
1868 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1869 rrd->stat_head->ds_cnt)) {
1871 rrd_set_error("rrd_write cdp_prep to rrd");
1872 return -1;
1873 }
1875 if (rrd_write(rrd_file, rrd->rra_ptr,
1876 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
1877 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
1878 rrd_set_error("rrd_write rra_ptr to rrd");
1879 return -1;
1880 }
1881 return 0;
1882 }
1883 #endif