1 /*****************************************************************************
2 * RRDtool 1.2.99907080300 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include <locale.h>
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 * replacement.
29 */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
36 };
37 #endif
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static int gettimeofday(
45 struct timeval *t,
46 struct __timezone *tz)
47 {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
61 /* FUNCTION PROTOTYPES */
63 int rrd_update_r(
64 const char *filename,
65 const char *tmplt,
66 int argc,
67 const char **argv);
68 int _rrd_update(
69 const char *filename,
70 const char *tmplt,
71 int argc,
72 const char **argv,
73 info_t *);
75 static int allocate_data_structures(
76 rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt,
77 long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt,
78 unsigned long **skip_update, rrd_value_t **pdp_new);
80 static int parse_template(rrd_t *rrd, const char *tmplt,
81 unsigned long *tmpl_cnt, long *tmpl_idx);
83 static int process_arg(
84 char *step_start,
85 rrd_t *rrd,
86 rrd_file_t *rrd_file,
87 unsigned long rra_begin,
88 unsigned long *rra_current,
89 time_t *current_time,
90 unsigned long *current_time_usec,
91 rrd_value_t *pdp_temp,
92 rrd_value_t *pdp_new,
93 unsigned long *rra_step_cnt,
94 char **updvals,
95 long *tmpl_idx,
96 unsigned long tmpl_cnt,
97 info_t **pcdp_summary,
98 int version,
99 unsigned long *skip_update,
100 int *schedule_smooth);
102 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
103 unsigned long tmpl_cnt, time_t *current_time, unsigned long *current_time_usec,
104 int version);
106 static int get_time_from_reading(rrd_t *rrd, char timesyntax, char **updvals,
107 time_t *current_time, unsigned long *current_time_usec, int version);
109 static int update_pdp_prep(rrd_t *rrd, char **updvals,
110 rrd_value_t *pdp_new, double interval);
112 static int calculate_elapsed_steps(rrd_t *rrd,
113 unsigned long current_time, unsigned long current_time_usec,
114 double interval, double *pre_int, double *post_int,
115 unsigned long *proc_pdp_cnt);
117 static void simple_update(rrd_t *rrd, double interval, rrd_value_t *pdp_new);
119 static int process_all_pdp_st(rrd_t *rrd, double interval,
120 double pre_int, double post_int, unsigned long elapsed_pdp_st,
121 rrd_value_t *pdp_new, rrd_value_t *pdp_temp);
123 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval,
124 double pre_int, double post_int, long diff_pdp_st, rrd_value_t *pdp_new,
125 rrd_value_t *pdp_temp);
127 static int update_all_cdp_prep(
128 rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin,
129 rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
130 rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
131 rrd_value_t *pdp_temp, unsigned long *rra_current,
132 unsigned long *skip_update, int *schedule_smooth);
134 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx,
135 unsigned long elapsed_pdp_st);
137 static int update_cdp_prep(rrd_t *rrd, unsigned long elapsed_pdp_st,
138 unsigned long start_pdp_offset, unsigned long *rra_step_cnt,
139 int rra_idx, rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef,
140 rrd_value_t *seasonal_coef, int current_cf);
142 static void update_cdp(unival *scratch, int current_cf,
143 rrd_value_t pdp_temp_val, unsigned long rra_step_cnt,
144 unsigned long elapsed_pdp_st, unsigned long start_pdp_offset,
145 unsigned long pdp_cnt, rrd_value_t xff, int i, int ii);
147 static void initialize_cdp_val(unival *scratch, int current_cf,
148 rrd_value_t pdp_temp_val, unsigned long elapsed_pdp_st,
149 unsigned long start_pdp_offset, unsigned long pdp_cnt);
151 static void reset_cdp(rrd_t *rrd, unsigned long elapsed_pdp_st,
152 rrd_value_t *pdp_temp, rrd_value_t *last_seasonal_coef,
153 rrd_value_t *seasonal_coef,
154 int rra_idx, int ds_idx, int cdp_idx, enum cf_en current_cf);
156 static rrd_value_t initialize_average_carry_over(rrd_value_t pdp_temp_val,
157 unsigned long elapsed_pdp_st, unsigned long start_pdp_offset,
158 unsigned long pdp_cnt);
160 static rrd_value_t calculate_cdp_val(
161 rrd_value_t cdp_val, rrd_value_t pdp_temp_val,
162 unsigned long elapsed_pdp_st, int current_cf, int i, int ii);
164 static int update_aberrant_cdps(rrd_t *rrd, rrd_file_t *rrd_file,
165 unsigned long rra_begin, unsigned long *rra_current,
166 unsigned long elapsed_pdp_st, rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef);
168 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file,
169 unsigned long *rra_step_cnt, unsigned long rra_begin,
170 unsigned long *rra_current, time_t current_time,
171 unsigned long *skip_update, info_t **pcdp_summary);
173 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
174 unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
175 time_t rra_time);
177 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file,
178 unsigned long rra_begin);
180 #ifndef HAVE_MMAP
181 static int write_changes_to_disk(rrd_t *rrd, rrd_file_t *rrd_file,
182 int version);
183 #endif
185 /*
186 * normalize time as returned by gettimeofday. usec part must
187 * be always >= 0
188 */
189 static inline void normalize_time(
190 struct timeval *t)
191 {
192 if (t->tv_usec < 0) {
193 t->tv_sec--;
194 t->tv_usec += 1e6L;
195 }
196 }
198 /*
199 * Sets current_time and current_time_usec based on the current time.
200 * current_time_usec is set to 0 if the version number is 1 or 2.
201 */
202 static inline void initialize_time(
203 time_t *current_time, unsigned long *current_time_usec,
204 int version)
205 {
206 struct timeval tmp_time; /* used for time conversion */
208 gettimeofday(&tmp_time, 0);
209 normalize_time(&tmp_time);
210 *current_time = tmp_time.tv_sec;
211 if (version >= 3) {
212 *current_time_usec = tmp_time.tv_usec;
213 } else {
214 *current_time_usec = 0;
215 }
216 }
218 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
220 info_t *rrd_update_v(
221 int argc,
222 char **argv)
223 {
224 char *tmplt = NULL;
225 info_t *result = NULL;
226 infoval rc;
227 struct option long_options[] = {
228 {"template", required_argument, 0, 't'},
229 {0, 0, 0, 0}
230 };
232 rc.u_int = -1;
233 optind = 0;
234 opterr = 0; /* initialize getopt */
236 while (1) {
237 int option_index = 0;
238 int opt;
240 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
242 if (opt == EOF)
243 break;
245 switch (opt) {
246 case 't':
247 tmplt = optarg;
248 break;
250 case '?':
251 rrd_set_error("unknown option '%s'", argv[optind - 1]);
252 goto end_tag;
253 }
254 }
256 /* need at least 2 arguments: filename, data. */
257 if (argc - optind < 2) {
258 rrd_set_error("Not enough arguments");
259 goto end_tag;
260 }
261 rc.u_int = 0;
262 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
263 rc.u_int = _rrd_update(argv[optind], tmplt,
264 argc - optind - 1,
265 (const char **) (argv + optind + 1), result);
266 result->value.u_int = rc.u_int;
267 end_tag:
268 return result;
269 }
271 int rrd_update(
272 int argc,
273 char **argv)
274 {
275 struct option long_options[] = {
276 {"template", required_argument, 0, 't'},
277 {0, 0, 0, 0}
278 };
279 int option_index = 0;
280 int opt;
281 char *tmplt = NULL;
282 int rc;
284 optind = 0;
285 opterr = 0; /* initialize getopt */
287 while (1) {
288 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
290 if (opt == EOF)
291 break;
293 switch (opt) {
294 case 't':
295 tmplt = strdup(optarg);
296 break;
298 case '?':
299 rrd_set_error("unknown option '%s'", argv[optind - 1]);
300 return (-1);
301 }
302 }
304 /* need at least 2 arguments: filename, data. */
305 if (argc - optind < 2) {
306 rrd_set_error("Not enough arguments");
308 return -1;
309 }
311 rc = rrd_update_r(argv[optind], tmplt,
312 argc - optind - 1, (const char **) (argv + optind + 1));
313 free(tmplt);
314 return rc;
315 }
317 int rrd_update_r(
318 const char *filename,
319 const char *tmplt,
320 int argc,
321 const char **argv)
322 {
323 return _rrd_update(filename, tmplt, argc, argv, NULL);
324 }
326 int _rrd_update(
327 const char *filename,
328 const char *tmplt,
329 int argc,
330 const char **argv,
331 info_t *pcdp_summary)
332 {
334 int arg_i = 2;
336 unsigned long rra_begin; /* byte pointer to the rra
337 * area in the rrd file. this
338 * pointer never changes value */
339 unsigned long rra_current; /* byte pointer to the current write
340 * spot in the rrd file. */
341 rrd_value_t *pdp_new; /* prepare the incoming data to be added
342 * to the existing entry */
343 rrd_value_t *pdp_temp; /* prepare the pdp values to be added
344 * to the cdp values */
346 long *tmpl_idx; /* index representing the settings
347 * transported by the tmplt index */
348 unsigned long tmpl_cnt = 2; /* time and data */
349 rrd_t rrd;
350 time_t current_time = 0;
351 unsigned long current_time_usec = 0; /* microseconds part of current time */
352 char **updvals;
353 int schedule_smooth = 0;
355 /* number of elapsed PDP steps since last update */
356 unsigned long *rra_step_cnt = NULL;
358 int version; /* rrd version */
359 rrd_file_t *rrd_file;
360 char *arg_copy; /* for processing the argv */
361 unsigned long *skip_update; /* RRAs to advance but not write */
363 /* need at least 1 arguments: data. */
364 if (argc < 1) {
365 rrd_set_error("Not enough arguments");
366 goto err_out;
367 }
369 if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
370 goto err_free;
371 }
372 /* We are now at the beginning of the rra's */
373 rra_current = rra_begin = rrd_file->header_len;
375 version = atoi(rrd.stat_head->version);
377 initialize_time(¤t_time, ¤t_time_usec, version);
379 /* get exclusive lock to whole file.
380 * lock gets removed when we close the file.
381 */
382 if (LockRRD(rrd_file->fd) != 0) {
383 rrd_set_error("could not lock RRD");
384 goto err_close;
385 }
387 if (allocate_data_structures(&rrd, &updvals,
388 &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
389 &rra_step_cnt, &skip_update, &pdp_new) == -1) {
390 goto err_close;
391 }
393 /* loop through the arguments. */
394 for (arg_i = 0; arg_i < argc; arg_i++) {
395 if ((arg_copy = strdup(argv[arg_i])) == NULL) {
396 rrd_set_error("failed duplication argv entry");
397 break;
398 }
399 if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
400 ¤t_time, ¤t_time_usec, pdp_temp, pdp_new,
401 rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary,
402 version, skip_update, &schedule_smooth) == -1) {
403 free(arg_copy);
404 break;
405 }
406 free(arg_copy);
407 }
409 free(rra_step_cnt);
411 /* if we got here and if there is an error and if the file has not been
412 * written to, then close things up and return. */
413 if (rrd_test_error()) {
414 goto err_free_structures;
415 }
417 #ifndef HAVE_MMAP
418 if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
419 goto err_free_structures;
420 }
421 #endif
423 /* calling the smoothing code here guarantees at most one smoothing
424 * operation per rrd_update call. Unfortunately, it is possible with bulk
425 * updates, or a long-delayed update for smoothing to occur off-schedule.
426 * This really isn't critical except during the burn-in cycles. */
427 if (schedule_smooth) {
428 smooth_all_rras(&rrd, rrd_file, rra_begin);
429 }
431 /* rrd_dontneed(rrd_file,&rrd); */
432 rrd_free(&rrd);
433 rrd_close(rrd_file);
435 free(pdp_new);
436 free(tmpl_idx);
437 free(pdp_temp);
438 free(skip_update);
439 free(updvals);
440 return 0;
442 err_free_structures:
443 free(pdp_new);
444 free(tmpl_idx);
445 free(pdp_temp);
446 free(skip_update);
447 free(updvals);
448 err_close:
449 rrd_close(rrd_file);
450 err_free:
451 rrd_free(&rrd);
452 err_out:
453 return -1;
454 }
456 /*
457 * get exclusive lock to whole file.
458 * lock gets removed when we close the file
459 *
460 * returns 0 on success
461 */
462 int LockRRD(
463 int in_file)
464 {
465 int rcstat;
467 {
468 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
469 struct _stat st;
471 if (_fstat(in_file, &st) == 0) {
472 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
473 } else {
474 rcstat = -1;
475 }
476 #else
477 struct flock lock;
479 lock.l_type = F_WRLCK; /* exclusive write lock */
480 lock.l_len = 0; /* whole file */
481 lock.l_start = 0; /* start of file */
482 lock.l_whence = SEEK_SET; /* end of file */
484 rcstat = fcntl(in_file, F_SETLK, &lock);
485 #endif
486 }
488 return (rcstat);
489 }
491 /*
492 * Allocate some important arrays used, and initialize the template.
493 *
494 * When it returns, either all of the structures are allocated
495 * or none of them are.
496 *
497 * Returns 0 on success, -1 on error.
498 */
499 static int allocate_data_structures(
500 rrd_t *rrd,
501 char ***updvals,
502 rrd_value_t **pdp_temp,
503 const char *tmplt,
504 long **tmpl_idx,
505 unsigned long *tmpl_cnt,
506 unsigned long **rra_step_cnt,
507 unsigned long **skip_update,
508 rrd_value_t **pdp_new)
509 {
510 unsigned i, ii;
511 if ((*updvals = (char **)malloc(sizeof(char *)
512 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
513 rrd_set_error("allocating updvals pointer array");
514 return -1;
515 }
516 if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
517 * rrd->stat_head->ds_cnt)) == NULL) {
518 rrd_set_error("allocating pdp_temp ...");
519 goto err_free_updvals;
520 }
521 if ((*skip_update = (unsigned long *)malloc(sizeof(unsigned long)
522 * rrd->stat_head->rra_cnt)) == NULL) {
523 rrd_set_error("allocating skip_update...");
524 goto err_free_pdp_temp;
525 }
526 if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
527 * (rrd->stat_head->ds_cnt + 1))) == NULL) {
528 rrd_set_error("allocating tmpl_idx ...");
529 goto err_free_skip_update;
530 }
531 if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long)
532 * (rrd->stat_head->rra_cnt))) == NULL) {
533 rrd_set_error("allocating rra_step_cnt...");
534 goto err_free_tmpl_idx;
535 }
537 /* initialize tmplt redirector */
538 /* default config example (assume DS 1 is a CDEF DS)
539 tmpl_idx[0] -> 0; (time)
540 tmpl_idx[1] -> 1; (DS 0)
541 tmpl_idx[2] -> 3; (DS 2)
542 tmpl_idx[3] -> 4; (DS 3) */
543 (*tmpl_idx)[0] = 0; /* time */
544 for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
545 if (dst_conv(rrd->ds_def[i-1].dst) != DST_CDEF)
546 (*tmpl_idx)[ii++] = i;
547 }
548 *tmpl_cnt = ii;
550 if (tmplt != NULL) {
551 if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
552 goto err_free_rra_step_cnt;
553 }
554 }
556 if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
557 * rrd->stat_head->ds_cnt)) == NULL) {
558 rrd_set_error("allocating pdp_new ...");
559 goto err_free_rra_step_cnt;
560 }
562 return 0;
564 err_free_rra_step_cnt:
565 free(*rra_step_cnt);
566 err_free_tmpl_idx:
567 free(*tmpl_idx);
568 err_free_skip_update:
569 free(*skip_update);
570 err_free_pdp_temp:
571 free(*pdp_temp);
572 err_free_updvals:
573 free(*updvals);
574 return -1;
575 }
577 /*
578 * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
579 *
580 * Returns 0 on success.
581 */
582 static int parse_template(
583 rrd_t *rrd, const char *tmplt,
584 unsigned long *tmpl_cnt, long *tmpl_idx)
585 {
586 char *dsname, *tmplt_copy;
587 unsigned int tmpl_len, i;
589 *tmpl_cnt = 1; /* the first entry is the time */
591 /* we should work on a writeable copy here */
592 if ((tmplt_copy = strdup(tmplt)) == NULL) {
593 rrd_set_error("error copying tmplt '%s'", tmplt);
594 return -1;
595 }
597 dsname = tmplt_copy;
598 tmpl_len = strlen(tmplt_copy);
599 for (i = 0; i <= tmpl_len; i++) {
600 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
601 tmplt_copy[i] = '\0';
602 if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
603 rrd_set_error("tmplt contains more DS definitions than RRD");
604 free(tmplt_copy);
605 return -1;
606 }
607 if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname)+1) == 0) {
608 rrd_set_error("unknown DS name '%s'", dsname);
609 free(tmplt_copy);
610 return -1;
611 }
612 /* go to the next entry on the tmplt_copy */
613 if (i < tmpl_len)
614 dsname = &tmplt_copy[i+1];
615 }
616 }
617 free(tmplt_copy);
618 return 0;
619 }
621 /*
622 * Parse an update string, updates the primary data points (PDPs)
623 * and consolidated data points (CDPs), and writes changes to the RRAs.
624 *
625 * Returns 0 on success, -1 on error.
626 */
627 static int process_arg(
628 char *step_start,
629 rrd_t *rrd,
630 rrd_file_t *rrd_file,
631 unsigned long rra_begin,
632 unsigned long *rra_current,
633 time_t *current_time,
634 unsigned long *current_time_usec,
635 rrd_value_t *pdp_temp,
636 rrd_value_t *pdp_new,
637 unsigned long *rra_step_cnt,
638 char **updvals,
639 long *tmpl_idx,
640 unsigned long tmpl_cnt,
641 info_t **pcdp_summary,
642 int version,
643 unsigned long *skip_update,
644 int *schedule_smooth)
645 {
646 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
648 /* a vector of future Holt-Winters seasonal coefs */
649 unsigned long elapsed_pdp_st;
651 double interval, pre_int, post_int; /* interval between this and
652 * the last run */
653 unsigned long proc_pdp_cnt;
654 unsigned long rra_start;
656 if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
657 current_time, current_time_usec, version) == -1) {
658 return -1;
659 }
660 /* seek to the beginning of the rra's */
661 if (*rra_current != rra_begin) {
662 #ifndef HAVE_MMAP
663 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
664 rrd_set_error("seek error in rrd");
665 return -1;
666 }
667 #endif
668 *rra_current = rra_begin;
669 }
670 rra_start = rra_begin;
672 interval = (double) (*current_time - rrd->live_head->last_up)
673 + (double) ((long) *current_time_usec -
674 (long) rrd->live_head->last_up_usec) / 1e6f;
676 /* process the data sources and update the pdp_prep
677 * area accordingly */
678 if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
679 return -1;
680 }
682 elapsed_pdp_st = calculate_elapsed_steps(rrd,
683 *current_time, *current_time_usec,
684 interval, &pre_int, &post_int,
685 &proc_pdp_cnt);
687 /* has a pdp_st moment occurred since the last run ? */
688 if (elapsed_pdp_st == 0) {
689 /* no we have not passed a pdp_st moment. therefore update is simple */
690 simple_update(rrd, interval, pdp_new);
691 } else {
692 /* an pdp_st has occurred. */
693 if (process_all_pdp_st(rrd, interval,
694 pre_int, post_int,
695 elapsed_pdp_st,
696 pdp_new, pdp_temp) == -1)
697 {
698 return -1;
699 }
700 if (update_all_cdp_prep(rrd, rra_step_cnt,
701 rra_begin, rrd_file,
702 elapsed_pdp_st,
703 proc_pdp_cnt,
704 &last_seasonal_coef,
705 &seasonal_coef,
706 pdp_temp, rra_current,
707 skip_update, schedule_smooth) == -1)
708 {
709 goto err_free_coefficients;
710 }
711 if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
712 elapsed_pdp_st, pdp_temp, &seasonal_coef) == -1)
713 {
714 goto err_free_coefficients;
715 }
716 if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
717 rra_current, *current_time, skip_update, pcdp_summary) == -1)
718 {
719 goto err_free_coefficients;
720 }
721 } /* endif a pdp_st has occurred */
722 rrd->live_head->last_up = *current_time;
723 rrd->live_head->last_up_usec = *current_time_usec;
725 free(seasonal_coef);
726 free(last_seasonal_coef);
727 return 0;
729 err_free_coefficients:
730 free(seasonal_coef);
731 free(last_seasonal_coef);
732 return -1;
733 }
735 /*
736 * Parse a DS string (time + colon-separated values), storing the
737 * results in current_time, current_time_usec, and updvals.
738 *
739 * Returns 0 on success, -1 on error.
740 */
741 static int parse_ds(
742 rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
743 unsigned long tmpl_cnt, time_t *current_time,
744 unsigned long *current_time_usec, int version)
745 {
746 char *p;
747 unsigned long i;
748 char timesyntax;
750 updvals[0] = input;
751 /* initialize all ds input to unknown except the first one
752 which has always got to be set */
753 for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
754 updvals[i] = "U";
756 /* separate all ds elements; first must be examined separately
757 due to alternate time syntax */
758 if ((p = strchr(input, '@')) != NULL) {
759 timesyntax = '@';
760 } else if ((p = strchr(input, ':')) != NULL) {
761 timesyntax = ':';
762 } else {
763 rrd_set_error("expected timestamp not found in data source from %s",
764 input);
765 return -1;
766 }
767 *p = '\0';
768 i = 1;
769 updvals[tmpl_idx[i++]] = p+1;
770 while (*(++p)) {
771 if (*p == ':') {
772 *p = '\0';
773 if (i < tmpl_cnt) {
774 updvals[tmpl_idx[i++]] = p+1;
775 }
776 }
777 }
779 if (i != tmpl_cnt) {
780 rrd_set_error("expected %lu data source readings (got %lu) from %s",
781 tmpl_cnt - 1, i, input);
782 return -1;
783 }
785 if (get_time_from_reading(rrd, timesyntax, updvals,
786 current_time, current_time_usec,
787 version) == -1) {
788 return -1;
789 }
790 return 0;
791 }
793 /*
794 * Parse the time in a DS string, store it in current_time and
795 * current_time_usec and verify that it's later than the last
796 * update for this DS.
797 *
798 * Returns 0 on success, -1 on error.
799 */
800 static int get_time_from_reading(
801 rrd_t *rrd, char timesyntax, char **updvals,
802 time_t *current_time, unsigned long *current_time_usec,
803 int version)
804 {
805 double tmp;
806 char *parsetime_error = NULL;
807 char *old_locale;
808 struct rrd_time_value ds_tv;
809 struct timeval tmp_time; /* used for time conversion */
811 /* get the time from the reading ... handle N */
812 if (timesyntax == '@') { /* at-style */
813 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
814 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
815 return -1;
816 }
817 if (ds_tv.type == RELATIVE_TO_END_TIME ||
818 ds_tv.type == RELATIVE_TO_START_TIME) {
819 rrd_set_error("specifying time relative to the 'start' "
820 "or 'end' makes no sense here: %s", updvals[0]);
821 return -1;
822 }
823 *current_time = mktime(&ds_tv.tm) + ds_tv.offset;
824 *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
825 } else if (strcmp(updvals[0], "N") == 0) {
826 gettimeofday(&tmp_time, 0);
827 normalize_time(&tmp_time);
828 *current_time = tmp_time.tv_sec;
829 *current_time_usec = tmp_time.tv_usec;
830 } else {
831 old_locale = setlocale(LC_NUMERIC, "C");
832 tmp = strtod(updvals[0], 0);
833 setlocale(LC_NUMERIC, old_locale);
834 *current_time = floor(tmp);
835 *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
836 }
837 /* dont do any correction for old version RRDs */
838 if (version < 3)
839 *current_time_usec = 0;
841 if (*current_time < rrd->live_head->last_up ||
842 (*current_time == rrd->live_head->last_up &&
843 (long) *current_time_usec <=
844 (long) rrd->live_head->last_up_usec)) {
845 rrd_set_error("illegal attempt to update using time %ld when "
846 "last update time is %ld (minimum one second step)",
847 *current_time, rrd->live_head->last_up);
848 return -1;
849 }
850 return 0;
851 }
853 /*
854 * Update pdp_new by interpreting the updvals according to the DS type
855 * (COUNTER, GAUGE, etc.).
856 *
857 * Returns 0 on success, -1 on error.
858 */
859 static int update_pdp_prep(
860 rrd_t *rrd, char **updvals,
861 rrd_value_t *pdp_new, double interval)
862 {
863 unsigned long ds_idx;
864 int ii;
865 char *endptr; /* used in the conversion */
866 double rate;
867 char *old_locale;
868 enum dst_en dst_idx;
870 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
871 dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
873 /* make sure we do not build diffs with old last_ds values */
874 if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
875 strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
876 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
877 }
879 /* NOTE: DST_CDEF should never enter this if block, because
880 * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
881 * accidently specified a value for the DST_CDEF. To handle this case,
882 * an extra check is required. */
884 if ((updvals[ds_idx+1][0] != 'U') &&
885 (dst_idx != DST_CDEF) &&
886 rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
887 rate = DNAN;
889 /* pdp_new contains rate * time ... eg the bytes transferred during
890 * the interval. Doing it this way saves a lot of math operations
891 */
892 switch (dst_idx) {
893 case DST_COUNTER:
894 case DST_DERIVE:
895 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
896 if ((updvals[ds_idx + 1][ii] < '0' || updvals[ds_idx + 1][ii] > '9')
897 && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
898 rrd_set_error("not a simple integer: '%s'", updvals[ds_idx + 1]);
899 return -1;
900 }
901 }
902 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
903 pdp_new[ds_idx] = rrd_diff(updvals[ds_idx+1], rrd->pdp_prep[ds_idx].last_ds);
904 if (dst_idx == DST_COUNTER) {
905 /* simple overflow catcher. This will fail
906 * terribly for non 32 or 64 bit counters
907 * ... are there any others in SNMP land?
908 */
909 if (pdp_new[ds_idx] < (double) 0.0)
910 pdp_new[ds_idx] += (double) 4294967296.0; /* 2^32 */
911 if (pdp_new[ds_idx] < (double) 0.0)
912 pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
913 }
914 rate = pdp_new[ds_idx] / interval;
915 } else {
916 pdp_new[ds_idx] = DNAN;
917 }
918 break;
919 case DST_ABSOLUTE:
920 old_locale = setlocale(LC_NUMERIC, "C");
921 errno = 0;
922 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
923 setlocale(LC_NUMERIC, old_locale);
924 if (errno > 0) {
925 rrd_set_error("converting '%s' to float: %s",
926 updvals[ds_idx + 1], rrd_strerror(errno));
927 return -1;
928 };
929 if (endptr[0] != '\0') {
930 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
931 updvals[ds_idx + 1], endptr);
932 return -1;
933 }
934 rate = pdp_new[ds_idx] / interval;
935 break;
936 case DST_GAUGE:
937 errno = 0;
938 old_locale = setlocale(LC_NUMERIC, "C");
939 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr) * interval;
940 setlocale(LC_NUMERIC, old_locale);
941 if (errno) {
942 rrd_set_error("converting '%s' to float: %s",
943 updvals[ds_idx + 1], rrd_strerror(errno));
944 return -1;
945 };
946 if (endptr[0] != '\0') {
947 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",
948 updvals[ds_idx + 1], endptr);
949 return -1;
950 }
951 rate = pdp_new[ds_idx] / interval;
952 break;
953 default:
954 rrd_set_error("rrd contains unknown DS type : '%s'",
955 rrd->ds_def[ds_idx].dst);
956 return -1;
957 }
958 /* break out of this for loop if the error string is set */
959 if (rrd_test_error()) {
960 return -1;
961 }
962 /* make sure pdp_temp is neither too large or too small
963 * if any of these occur it becomes unknown ...
964 * sorry folks ... */
965 if (!isnan(rate) &&
966 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
967 rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
968 (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
969 rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
970 pdp_new[ds_idx] = DNAN;
971 }
972 } else {
973 /* no news is news all the same */
974 pdp_new[ds_idx] = DNAN;
975 }
978 /* make a copy of the command line argument for the next run */
979 #ifdef DEBUG
980 fprintf(stderr, "prep ds[%lu]\t"
981 "last_arg '%s'\t"
982 "this_arg '%s'\t"
983 "pdp_new %10.2f\n",
984 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], pdp_new[ds_idx]);
985 #endif
986 strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx+1], LAST_DS_LEN - 1);
987 rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN-1] = '\0';
988 }
989 return 0;
990 }
992 /*
993 * How many PDP steps have elapsed since the last update? Returns the answer,
994 * and stores the time between the last update and the last PDP in pre_time,
995 * and the time between the last PDP and the current time in post_int.
996 */
997 static int calculate_elapsed_steps(
998 rrd_t *rrd,
999 unsigned long current_time,
1000 unsigned long current_time_usec,
1001 double interval,
1002 double *pre_int,
1003 double *post_int,
1004 unsigned long *proc_pdp_cnt)
1005 {
1006 unsigned long proc_pdp_st; /* which pdp_st was the last to be processed */
1007 unsigned long occu_pdp_st; /* when was the pdp_st before the last update
1008 * time */
1009 unsigned long proc_pdp_age; /* how old was the data in the pdp prep area
1010 * when it was last updated */
1011 unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1013 /* when was the current pdp started */
1014 proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1015 proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1017 /* when did the last pdp_st occur */
1018 occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1019 occu_pdp_st = current_time - occu_pdp_age;
1021 if (occu_pdp_st > proc_pdp_st) {
1022 /* OK we passed the pdp_st moment */
1023 *pre_int = (long) occu_pdp_st - rrd->live_head->last_up; /* how much of the input data
1024 * occurred before the latest
1025 * pdp_st moment*/
1026 *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1027 *post_int = occu_pdp_age; /* how much after it */
1028 *post_int += ((double) current_time_usec) / 1e6f; /* adjust usecs */
1029 } else {
1030 *pre_int = interval;
1031 *post_int = 0;
1032 }
1034 *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1036 #ifdef DEBUG
1037 printf("proc_pdp_age %lu\t"
1038 "proc_pdp_st %lu\t"
1039 "occu_pfp_age %lu\t"
1040 "occu_pdp_st %lu\t"
1041 "int %lf\t"
1042 "pre_int %lf\t"
1043 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1044 occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1045 #endif
1047 /* compute the number of elapsed pdp_st moments */
1048 return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1049 }
1051 /*
1052 * Increment the PDP values by the values in pdp_new, or else initialize them.
1053 */
1054 static void simple_update(
1055 rrd_t *rrd, double interval, rrd_value_t *pdp_new)
1056 {
1057 int i;
1058 for (i = 0; i < (signed)rrd->stat_head->ds_cnt; i++) {
1059 if (isnan(pdp_new[i])) {
1060 /* this is not really accurate if we use subsecond data arrival time
1061 should have thought of it when going subsecond resolution ...
1062 sorry next format change we will have it! */
1063 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
1064 } else {
1065 if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1066 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1067 } else {
1068 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1069 }
1070 }
1071 #ifdef DEBUG
1072 fprintf(stderr,
1073 "NO PDP ds[%i]\t"
1074 "value %10.2f\t"
1075 "unkn_sec %5lu\n",
1076 i,
1077 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1078 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1079 #endif
1080 }
1081 }
1083 /*
1084 * Call process_pdp_st for each DS.
1085 *
1086 * Returns 0 on success, -1 on error.
1087 */
1088 static int process_all_pdp_st(
1089 rrd_t *rrd, double interval, double pre_int, double post_int,
1090 unsigned long elapsed_pdp_st, rrd_value_t *pdp_new, rrd_value_t *pdp_temp)
1091 {
1092 unsigned long ds_idx;
1093 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1094 rate*seconds which occurred up to the last run.
1095 pdp_new[] contains rate*seconds from the latest run.
1096 pdp_temp[] will contain the rate for cdp */
1098 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1099 if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1100 elapsed_pdp_st * rrd->stat_head->pdp_step,
1101 pdp_new, pdp_temp) == -1) {
1102 return -1;
1103 }
1104 #ifdef DEBUG
1105 fprintf(stderr, "PDP UPD ds[%lu]\t"
1106 "pdp_temp %10.2f\t"
1107 "new_prep %10.2f\t"
1108 "new_unkn_sec %5lu\n",
1109 ds_idx, pdp_temp[ds_idx],
1110 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1111 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1112 #endif
1113 }
1114 return 0;
1115 }
1117 /*
1118 * Process an update that occurs after one of the PDP moments.
1119 * Increments the PDP value, sets NAN if time greater than the
1120 * heartbeats have elapsed, processes CDEFs.
1121 *
1122 * Returns 0 on success, -1 on error.
1123 */
1124 static int process_pdp_st(rrd_t *rrd, unsigned long ds_idx, double interval,
1125 double pre_int, double post_int, long diff_pdp_st,
1126 rrd_value_t *pdp_new, rrd_value_t *pdp_temp)
1127 {
1128 int i;
1129 /* update pdp_prep to the current pdp_st. */
1130 double pre_unknown = 0.0;
1131 unival *scratch = rrd->pdp_prep[ds_idx].scratch;
1132 unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1134 rpnstack_t rpnstack; /* used for COMPUTE DS */
1135 rpnstack_init(&rpnstack);
1138 if (isnan(pdp_new[ds_idx])) {
1139 /* a final bit of unknown to be added bevore calculation
1140 we use a temporary variable for this so that we
1141 don't have to turn integer lines before using the value */
1142 pre_unknown = pre_int;
1143 } else {
1144 if (isnan(scratch[PDP_val].u_val)) {
1145 scratch[PDP_val].u_val = 0;
1146 }
1147 scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1148 }
1150 /* if too much of the pdp_prep is unknown we dump it */
1151 /* if the interval is larger thatn mrhb we get NAN */
1152 if ((interval > mrhb) ||
1153 (diff_pdp_st <= (signed)scratch[PDP_unkn_sec_cnt].u_cnt)) {
1154 pdp_temp[ds_idx] = DNAN;
1155 } else {
1156 pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1157 ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) - pre_unknown);
1158 }
1160 /* process CDEF data sources; remember each CDEF DS can
1161 * only reference other DS with a lower index number */
1162 if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1163 rpnp_t *rpnp;
1165 rpnp = rpn_expand((rpn_cdefds_t *)&(rrd->ds_def[ds_idx].par[DS_cdef]));
1166 /* substitute data values for OP_VARIABLE nodes */
1167 for (i = 0; rpnp[i].op != OP_END; i++) {
1168 if (rpnp[i].op == OP_VARIABLE) {
1169 rpnp[i].op = OP_NUMBER;
1170 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1171 }
1172 }
1173 /* run the rpn calculator */
1174 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1175 free(rpnp);
1176 rpnstack_free(&rpnstack);
1177 return -1;
1178 }
1179 }
1181 /* make pdp_prep ready for the next run */
1182 if (isnan(pdp_new[ds_idx])) {
1183 /* this is not realy accurate if we use subsecond data arival time
1184 should have thought of it when going subsecond resolution ...
1185 sorry next format change we will have it! */
1186 scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1187 scratch[PDP_val].u_val = DNAN;
1188 } else {
1189 scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1190 scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1191 }
1192 rpnstack_free(&rpnstack);
1193 return 0;
1194 }
1196 /*
1197 * Iterate over all the RRAs for a given DS and:
1198 * 1. Decide whether to schedule a smooth later
1199 * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1200 * 3. Update the CDP
1201 *
1202 * Returns 0 on success, -1 on error
1203 */
1204 static int update_all_cdp_prep(
1205 rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin,
1206 rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
1207 rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
1208 rrd_value_t *pdp_temp, unsigned long *rra_current,
1209 unsigned long *skip_update, int *schedule_smooth)
1210 {
1211 unsigned long rra_idx;
1212 /* index into the CDP scratch array */
1213 enum cf_en current_cf;
1214 unsigned long rra_start;
1215 /* number of rows to be updated in an RRA for a data value. */
1216 unsigned long start_pdp_offset;
1218 rra_start = rra_begin;
1219 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1220 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1221 start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1222 skip_update[rra_idx] = 0;
1223 if (start_pdp_offset <= elapsed_pdp_st) {
1224 rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1225 rrd->rra_def[rra_idx].pdp_cnt + 1;
1226 } else {
1227 rra_step_cnt[rra_idx] = 0;
1228 }
1230 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1231 /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1232 * so that they will be correct for the next observed value; note that for
1233 * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1234 * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1235 if (rra_step_cnt[rra_idx] > 1) {
1236 skip_update[rra_idx] = 1;
1237 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1238 elapsed_pdp_st, last_seasonal_coef);
1239 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1240 elapsed_pdp_st + 1, seasonal_coef);
1241 }
1242 /* periodically run a smoother for seasonal effects */
1243 if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1244 #ifdef DEBUG
1245 fprintf(stderr, "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1246 rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1247 rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt);
1248 #endif
1249 *schedule_smooth = 1;
1250 }
1251 *rra_current = rrd_tell(rrd_file);
1252 }
1253 if (rrd_test_error())
1254 return -1;
1256 if (update_cdp_prep(rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt,
1257 rra_idx, pdp_temp, *last_seasonal_coef, *seasonal_coef,
1258 current_cf) == -1) {
1259 return -1;
1260 }
1261 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1262 }
1263 return 0;
1264 }
1266 /*
1267 * Are we due for a smooth? Also increments our position in the burn-in cycle.
1268 */
1269 static int do_schedule_smooth(
1270 rrd_t *rrd, unsigned long rra_idx,
1271 unsigned long elapsed_pdp_st)
1272 {
1273 unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1274 unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1275 unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1276 unsigned long seasonal_smooth_idx = rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1277 unsigned long *init_seasonal = &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1279 /* Need to use first cdp parameter buffer to track burnin (burnin requires
1280 * a specific smoothing schedule). The CDP_init_seasonal parameter is
1281 * really an RRA level, not a data source within RRA level parameter, but
1282 * the rra_def is read only for rrd_update (not flushed to disk). */
1283 if (*init_seasonal > BURNIN_CYCLES) {
1284 /* someone has no doubt invented a trick to deal with this wrap around,
1285 * but at least this code is clear. */
1286 if (seasonal_smooth_idx > cur_row) {
1287 /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1288 * between PDP and CDP */
1289 return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1290 }
1291 /* can't rely on negative numbers because we are working with
1292 * unsigned values */
1293 return (cur_row + elapsed_pdp_st >= row_cnt
1294 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1295 }
1296 /* mark off one of the burn-in cycles */
1297 return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1298 }
1300 /*
1301 * For a given RRA, iterate over the data sources and call the appropriate
1302 * consolidation function.
1303 *
1304 * Returns 0 on success, -1 on error.
1305 */
1306 static int update_cdp_prep(
1307 rrd_t *rrd,
1308 unsigned long elapsed_pdp_st,
1309 unsigned long start_pdp_offset,
1310 unsigned long *rra_step_cnt,
1311 int rra_idx,
1312 rrd_value_t *pdp_temp,
1313 rrd_value_t *last_seasonal_coef,
1314 rrd_value_t *seasonal_coef,
1315 int current_cf)
1316 {
1317 unsigned long ds_idx, cdp_idx;
1318 /* update CDP_PREP areas */
1319 /* loop over data soures within each RRA */
1320 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1322 cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1324 if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1325 update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1326 pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1327 elapsed_pdp_st, start_pdp_offset,
1328 rrd->rra_def[rra_idx].pdp_cnt,
1329 rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val, rra_idx, ds_idx);
1330 } else {
1331 /* Nothing to consolidate if there's one PDP per CDP. However, if
1332 * we've missed some PDPs, let's update null counters etc. */
1333 if (elapsed_pdp_st > 2) {
1334 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef, seasonal_coef,
1335 rra_idx, ds_idx, cdp_idx, current_cf);
1336 }
1337 }
1339 if (rrd_test_error())
1340 return -1;
1341 } /* endif data sources loop */
1342 return 0;
1343 }
1345 /*
1346 * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1347 * primary value, secondary value, and # of unknowns.
1348 */
1349 static void update_cdp(
1350 unival *scratch,
1351 int current_cf,
1352 rrd_value_t pdp_temp_val,
1353 unsigned long rra_step_cnt,
1354 unsigned long elapsed_pdp_st,
1355 unsigned long start_pdp_offset,
1356 unsigned long pdp_cnt,
1357 rrd_value_t xff,
1358 int i, int ii)
1359 {
1360 /* shorthand variables */
1361 rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1362 rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1363 rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1364 unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1366 if (rra_step_cnt) {
1367 /* If we are in this block, as least 1 CDP value will be written to
1368 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1369 * to be written, then the "fill in" value is the CDP_secondary_val
1370 * entry. */
1371 if (isnan(pdp_temp_val)) {
1372 *cdp_unkn_pdp_cnt += start_pdp_offset;
1373 *cdp_secondary_val = DNAN;
1374 } else {
1375 /* CDP_secondary value is the RRA "fill in" value for intermediary
1376 * CDP data entries. No matter the CF, the value is the same because
1377 * the average, max, min, and last of a list of identical values is
1378 * the same, namely, the value itself. */
1379 *cdp_secondary_val = pdp_temp_val;
1380 }
1382 if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1383 *cdp_primary_val = DNAN;
1384 if (current_cf == CF_AVERAGE) {
1385 *cdp_val = initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1386 start_pdp_offset, pdp_cnt);
1387 } else {
1388 *cdp_val = pdp_temp_val;
1389 }
1390 } else {
1391 initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1392 elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1393 } /* endif meets xff value requirement for a valid value */
1394 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1395 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1396 if (isnan(pdp_temp_val))
1397 *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1398 else
1399 *cdp_unkn_pdp_cnt = 0;
1400 } else { /* rra_step_cnt[i] == 0 */
1402 #ifdef DEBUG
1403 if (isnan(*cdp_val)) {
1404 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1405 i, ii);
1406 } else {
1407 fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1408 i, ii, *cdp_val);
1409 }
1410 #endif
1411 if (isnan(pdp_temp_val)) {
1412 *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1413 } else {
1414 *cdp_val = calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st, current_cf, i, ii);
1415 }
1416 }
1417 }
1419 /*
1420 * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1421 * on the type of consolidation function.
1422 */
1423 static void initialize_cdp_val(
1424 unival *scratch,
1425 int current_cf,
1426 rrd_value_t pdp_temp_val,
1427 unsigned long elapsed_pdp_st,
1428 unsigned long start_pdp_offset,
1429 unsigned long pdp_cnt)
1430 {
1431 rrd_value_t cum_val, cur_val;
1433 switch (current_cf) {
1434 case CF_AVERAGE:
1435 cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1436 cur_val = IFDNAN(pdp_temp_val, 0.0);
1437 scratch[CDP_primary_val].u_val =
1438 (cum_val + cur_val * start_pdp_offset) /
1439 (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1440 scratch[CDP_val].u_val = initialize_average_carry_over(
1441 pdp_temp_val, elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1442 break;
1443 case CF_MAXIMUM:
1444 cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1445 cur_val = IFDNAN(pdp_temp_val, -DINF);
1446 #if 0
1447 #ifdef DEBUG
1448 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1449 fprintf(stderr,
1450 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1451 i, ii);
1452 exit(-1);
1453 }
1454 #endif
1455 #endif
1456 if (cur_val > cum_val)
1457 scratch[CDP_primary_val].u_val = cur_val;
1458 else
1459 scratch[CDP_primary_val].u_val = cum_val;
1460 /* initialize carry over value */
1461 scratch[CDP_val].u_val = pdp_temp_val;
1462 break;
1463 case CF_MINIMUM:
1464 cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1465 cur_val = IFDNAN(pdp_temp_val, DINF);
1466 #if 0
1467 #ifdef DEBUG
1468 if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1469 fprintf(stderr, "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1470 i, ii);
1471 exit(-1);
1472 }
1473 #endif
1474 #endif
1475 if (cur_val < cum_val)
1476 scratch[CDP_primary_val].u_val = cur_val;
1477 else
1478 scratch[CDP_primary_val].u_val = cum_val;
1479 /* initialize carry over value */
1480 scratch[CDP_val].u_val = pdp_temp_val;
1481 break;
1482 case CF_LAST:
1483 default:
1484 scratch[CDP_primary_val].u_val = pdp_temp_val;
1485 /* initialize carry over value */
1486 scratch[CDP_val].u_val = pdp_temp_val;
1487 break;
1488 }
1489 }
1491 /*
1492 * Update the consolidation function for Holt-Winters functions as
1493 * well as other functions that don't actually consolidate multiple
1494 * PDPs.
1495 */
1496 static void reset_cdp(
1497 rrd_t *rrd,
1498 unsigned long elapsed_pdp_st,
1499 rrd_value_t *pdp_temp,
1500 rrd_value_t *last_seasonal_coef,
1501 rrd_value_t *seasonal_coef,
1502 int rra_idx, int ds_idx, int cdp_idx,
1503 enum cf_en current_cf)
1504 {
1505 unival *scratch = rrd->cdp_prep[cdp_idx].scratch;
1507 switch (current_cf) {
1508 case CF_AVERAGE:
1509 default:
1510 scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1511 scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1512 break;
1513 case CF_SEASONAL:
1514 case CF_DEVSEASONAL:
1515 /* need to update cached seasonal values, so they are consistent
1516 * with the bulk update */
1517 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1518 * CDP_last_deviation are the same. */
1519 scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1520 scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1521 break;
1522 case CF_HWPREDICT:
1523 case CF_MHWPREDICT:
1524 /* need to update the null_count and last_null_count.
1525 * even do this for non-DNAN pdp_temp because the
1526 * algorithm is not learning from batch updates. */
1527 scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1528 scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1529 /* fall through */
1530 case CF_DEVPREDICT:
1531 scratch[CDP_primary_val].u_val = DNAN;
1532 scratch[CDP_secondary_val].u_val = DNAN;
1533 break;
1534 case CF_FAILURES:
1535 /* do not count missed bulk values as failures */
1536 scratch[CDP_primary_val].u_val = 0;
1537 scratch[CDP_secondary_val].u_val = 0;
1538 /* need to reset violations buffer.
1539 * could do this more carefully, but for now, just
1540 * assume a bulk update wipes away all violations. */
1541 erase_violations(rrd, cdp_idx, rra_idx);
1542 break;
1543 }
1544 }
1546 static rrd_value_t initialize_average_carry_over(
1547 rrd_value_t pdp_temp_val,
1548 unsigned long elapsed_pdp_st,
1549 unsigned long start_pdp_offset,
1550 unsigned long pdp_cnt)
1551 {
1552 /* initialize carry over value */
1553 if (isnan(pdp_temp_val)) {
1554 return DNAN;
1555 }
1556 return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1557 }
1559 /*
1560 * Update or initialize a CDP value based on the consolidation
1561 * function.
1562 *
1563 * Returns the new value.
1564 */
1565 static rrd_value_t calculate_cdp_val(
1566 rrd_value_t cdp_val,
1567 rrd_value_t pdp_temp_val,
1568 unsigned long elapsed_pdp_st,
1569 int current_cf, int i, int ii)
1570 {
1571 if (isnan(cdp_val)) {
1572 if (current_cf == CF_AVERAGE) {
1573 pdp_temp_val *= elapsed_pdp_st;
1574 }
1575 #ifdef DEBUG
1576 fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1577 i, ii, pdp_temp_val);
1578 #endif
1579 return pdp_temp_val;
1580 }
1581 if (current_cf == CF_AVERAGE)
1582 return cdp_val + pdp_temp_val * elapsed_pdp_st;
1583 if (current_cf == CF_MINIMUM)
1584 return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1585 if (current_cf == CF_MAXIMUM)
1586 return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1588 return pdp_temp_val;
1589 }
1591 /*
1592 * For each RRA, update the seasonal values and then call update_aberrant_CF
1593 * for each data source.
1594 *
1595 * Return 0 on success, -1 on error.
1596 */
1597 static int update_aberrant_cdps(
1598 rrd_t *rrd, rrd_file_t *rrd_file, unsigned long rra_begin,
1599 unsigned long *rra_current, unsigned long elapsed_pdp_st,
1600 rrd_value_t *pdp_temp, rrd_value_t **seasonal_coef)
1601 {
1602 unsigned long rra_idx, ds_idx, j;
1604 /* number of PDP steps since the last update that
1605 * are assigned to the first CDP to be generated
1606 * since the last update. */
1607 unsigned short scratch_idx;
1608 unsigned long rra_start;
1609 enum cf_en current_cf;
1611 /* this loop is only entered if elapsed_pdp_st < 3 */
1612 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1613 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1614 rra_start = rra_begin;
1615 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1616 if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1617 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1618 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1619 if (scratch_idx == CDP_primary_val) {
1620 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1621 elapsed_pdp_st + 1, seasonal_coef);
1622 } else {
1623 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1624 elapsed_pdp_st + 2, seasonal_coef);
1625 }
1626 *rra_current = rrd_tell(rrd_file);
1627 }
1628 if (rrd_test_error())
1629 return -1;
1630 /* loop over data soures within each RRA */
1631 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1632 update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1633 rra_idx * (rrd->stat_head->ds_cnt) + ds_idx,
1634 rra_idx, ds_idx, scratch_idx, *seasonal_coef);
1635 }
1636 }
1637 rra_start += rrd->rra_def[rra_idx].row_cnt
1638 * rrd->stat_head->ds_cnt
1639 * sizeof(rrd_value_t);
1640 }
1641 }
1642 return 0;
1643 }
1645 /*
1646 * Move sequentially through the file, writing one RRA at a time. Note this
1647 * architecture divorces the computation of CDP with flushing updated RRA
1648 * entries to disk.
1649 *
1650 * Return 0 on success, -1 on error.
1651 */
1652 static int write_to_rras(
1653 rrd_t *rrd,
1654 rrd_file_t *rrd_file,
1655 unsigned long *rra_step_cnt,
1656 unsigned long rra_begin,
1657 unsigned long *rra_current,
1658 time_t current_time,
1659 unsigned long *skip_update,
1660 info_t **pcdp_summary)
1661 {
1662 unsigned long rra_idx;
1663 unsigned long rra_start;
1664 unsigned long rra_pos_tmp; /* temporary byte pointer. */
1665 time_t rra_time = 0; /* time of update for a RRA */
1667 /* Ready to write to disk */
1668 rra_start = rra_begin;
1669 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1670 /* skip unless there's something to write */
1671 if (rra_step_cnt[rra_idx]) {
1672 /* write the first row */
1673 #ifdef DEBUG
1674 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1675 #endif
1676 rrd->rra_ptr[rra_idx].cur_row++;
1677 if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
1678 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
1679 /* position on the first row */
1680 rra_pos_tmp = rra_start +
1681 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1682 sizeof(rrd_value_t);
1683 if (rra_pos_tmp != *rra_current) {
1684 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1685 rrd_set_error("seek error in rrd");
1686 return -1;
1687 }
1688 *rra_current = rra_pos_tmp;
1689 }
1690 #ifdef DEBUG
1691 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1692 #endif
1693 if (!skip_update[rra_idx]) {
1694 if (*pcdp_summary != NULL) {
1695 rra_time = (current_time - current_time
1696 % (rrd->rra_def[rra_idx].pdp_cnt *
1697 rrd->stat_head->pdp_step))
1698 - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
1699 rrd->stat_head->pdp_step);
1700 }
1701 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1702 pcdp_summary, rra_time) == -1)
1703 return -1;
1704 }
1706 /* write other rows of the bulk update, if any */
1707 for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1708 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
1709 #ifdef DEBUG
1710 fprintf(stderr,
1711 "Wraparound for RRA %s, %lu updates left\n",
1712 rrd->rra_def[rra_idx].cf_nam, rra_step_cnt[rra_idx] - 1);
1713 #endif
1714 /* wrap */
1715 rrd->rra_ptr[rra_idx].cur_row = 0;
1716 /* seek back to beginning of current rra */
1717 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1718 rrd_set_error("seek error in rrd");
1719 return -1;
1720 }
1721 #ifdef DEBUG
1722 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1723 rrd_file->pos);
1724 #endif
1725 *rra_current = rra_start;
1726 }
1727 if (!skip_update[rra_idx]) {
1728 if (*pcdp_summary != NULL) {
1729 rra_time = (current_time - current_time
1730 % (rrd->rra_def[rra_idx].pdp_cnt *
1731 rrd->stat_head->pdp_step))
1732 -
1733 ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
1734 rrd->stat_head->pdp_step);
1735 }
1736 if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1737 CDP_secondary_val, pcdp_summary, rra_time) == -1)
1738 return -1;
1739 }
1740 }
1741 }
1742 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1743 sizeof(rrd_value_t);
1744 } /* RRA LOOP */
1746 return 0;
1747 }
1749 /*
1750 * Write out one row of values (one value per DS) to the archive.
1751 *
1752 * Returns 0 on success, -1 on error.
1753 */
1754 static int write_RRA_row(
1755 rrd_file_t *rrd_file,
1756 rrd_t *rrd,
1757 unsigned long rra_idx,
1758 unsigned long *rra_current,
1759 unsigned short CDP_scratch_idx,
1760 info_t **pcdp_summary,
1761 time_t rra_time)
1762 {
1763 unsigned long ds_idx, cdp_idx;
1764 infoval iv;
1766 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1767 /* compute the cdp index */
1768 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1769 #ifdef DEBUG
1770 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1771 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1772 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1773 #endif
1774 if (pcdp_summary != NULL) {
1775 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1776 /* append info to the return hash */
1777 *pcdp_summary = info_push(*pcdp_summary,
1778 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", rra_time,
1779 rrd->rra_def[rra_idx].cf_nam,
1780 rrd->rra_def[rra_idx].pdp_cnt,
1781 rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);
1782 }
1783 if (rrd_write(rrd_file,
1784 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1785 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1786 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1787 return -1;
1788 }
1789 *rra_current += sizeof(rrd_value_t);
1790 }
1791 return 0;
1792 }
1794 /*
1795 * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1796 *
1797 * Returns 0 on success, -1 otherwise
1798 */
1799 static int smooth_all_rras(
1800 rrd_t *rrd,
1801 rrd_file_t *rrd_file,
1802 unsigned long rra_begin)
1803 {
1804 unsigned long rra_start = rra_begin;
1805 unsigned long rra_idx;
1806 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1807 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1808 cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1809 #ifdef DEBUG
1810 fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1811 #endif
1812 apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1813 if (rrd_test_error())
1814 return -1;
1815 }
1816 rra_start += rrd->rra_def[rra_idx].row_cnt
1817 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1818 }
1819 return 0;
1820 }
1822 #ifndef HAVE_MMAP
1823 /*
1824 * Flush changes to disk (unless we're using mmap)
1825 *
1826 * Returns 0 on success, -1 otherwise
1827 */
1828 static int write_changes_to_disk(
1829 rrd_t *rrd, rrd_file_t *rrd_file, int version)
1830 {
1831 /* we just need to write back the live header portion now*/
1832 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1833 + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
1834 + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
1835 SEEK_SET) != 0) {
1836 rrd_set_error("seek rrd for live header writeback");
1837 return -1;
1838 }
1839 if (version >= 3) {
1840 if (rrd_write(rrd_file, rrd->live_head,
1841 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1842 rrd_set_error("rrd_write live_head to rrd");
1843 return -1;
1844 }
1845 } else {
1846 if (rrd_write(rrd_file, &rrd->live_head->last_up,
1847 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1848 rrd_set_error("rrd_write live_head to rrd");
1849 return -1;
1850 }
1851 }
1854 if (rrd_write(rrd_file, rrd->pdp_prep,
1855 sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
1856 != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
1857 rrd_set_error("rrd_write pdp_prep to rrd");
1858 return -1;
1859 }
1861 if (rrd_write(rrd_file, rrd->cdp_prep,
1862 sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1863 rrd->stat_head->ds_cnt)
1864 != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
1865 rrd->stat_head->ds_cnt)) {
1867 rrd_set_error("rrd_write cdp_prep to rrd");
1868 return -1;
1869 }
1871 if (rrd_write(rrd_file, rrd->rra_ptr,
1872 sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
1873 != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
1874 rrd_set_error("rrd_write rra_ptr to rrd");
1875 return -1;
1876 }
1877 return 0;
1878 }
1879 #endif