1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
26 * replacement.
27 */
28 #include <sys/timeb.h>
30 #ifndef __MINGW32__
31 struct timeval {
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
34 };
35 #endif
37 struct __timezone {
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
40 };
42 static int gettimeofday(
43 struct timeval *t,
44 struct __timezone *tz)
45 {
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
54 return 0;
55 }
57 #endif
58 /*
59 * normalize time as returned by gettimeofday. usec part must
60 * be always >= 0
61 */
62 static inline void normalize_time(
63 struct timeval *t)
64 {
65 if (t->tv_usec < 0) {
66 t->tv_sec--;
67 t->tv_usec += 1000000L;
68 }
69 }
71 static info_t *write_RRA_row(
72 rrd_file_t *rrd_file,
73 rrd_t *rrd,
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
77 info_t *pcdp_summary,
78 time_t *rra_time)
79 {
80 unsigned long ds_idx, cdp_idx;
81 infoval iv;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96 *rra_time,
97 rrd->rra_def[rra_idx].
98 cf_nam,
99 rrd->rra_def[rra_idx].
100 pdp_cnt,
101 rrd->ds_def[ds_idx].
102 ds_nam), RD_I_VAL, iv);
103 }
104 if (rrd_write
105 (rrd_file,
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
108 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
109 return 0;
110 }
111 *rra_current += sizeof(rrd_value_t);
112 }
113 return (pcdp_summary);
114 }
116 int rrd_update_r(
117 const char *filename,
118 const char *tmplt,
119 int argc,
120 const char **argv);
121 int _rrd_update(
122 const char *filename,
123 const char *tmplt,
124 int argc,
125 const char **argv,
126 info_t *);
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
132 int argc,
133 char **argv)
134 {
135 char *tmplt = NULL;
136 info_t *result = NULL;
137 infoval rc;
139 rc.u_int = -1;
140 optind = 0;
141 opterr = 0; /* initialize getopt */
143 while (1) {
144 static struct option long_options[] = {
145 {"template", required_argument, 0, 't'},
146 {0, 0, 0, 0}
147 };
148 int option_index = 0;
149 int opt;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
153 if (opt == EOF)
154 break;
156 switch (opt) {
157 case 't':
158 tmplt = optarg;
159 break;
161 case '?':
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
163 goto end_tag;
164 }
165 }
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
170 goto end_tag;
171 }
172 rc.u_int = 0;
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
175 argc - optind - 1,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
178 end_tag:
179 return result;
180 }
182 int rrd_update(
183 int argc,
184 char **argv)
185 {
186 char *tmplt = NULL;
187 int rc;
189 optind = 0;
190 opterr = 0; /* initialize getopt */
192 while (1) {
193 static struct option long_options[] = {
194 {"template", required_argument, 0, 't'},
195 {0, 0, 0, 0}
196 };
197 int option_index = 0;
198 int opt;
200 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
202 if (opt == EOF)
203 break;
205 switch (opt) {
206 case 't':
207 tmplt = optarg;
208 break;
210 case '?':
211 rrd_set_error("unknown option '%s'", argv[optind - 1]);
212 return (-1);
213 }
214 }
216 /* need at least 2 arguments: filename, data. */
217 if (argc - optind < 2) {
218 rrd_set_error("Not enough arguments");
220 return -1;
221 }
223 rc = rrd_update_r(argv[optind], tmplt,
224 argc - optind - 1, (const char **) (argv + optind + 1));
225 return rc;
226 }
228 int rrd_update_r(
229 const char *filename,
230 const char *tmplt,
231 int argc,
232 const char **argv)
233 {
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
235 }
237 int _rrd_update(
238 const char *filename,
239 const char *tmplt,
240 int argc,
241 const char **argv,
242 info_t *pcdp_summary)
243 {
245 int arg_i = 2;
246 short j;
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
255 * processed. */
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
260 * the last run */
261 unsigned long proc_pdp_st; /* which pdp_st was the last
262 * to be processed */
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
265 * time */
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
270 * pdp_step time */
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
273 * existing entry */
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
276 * cdp values */
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
282 rrd_t rrd;
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
288 char **updvals;
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
299 * value. */
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
319 if (argc < 1) {
320 rrd_set_error("Not enough arguments");
321 goto err_out;
322 }
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
326 goto err_free;
327 }
329 /* initialize time */
330 version = atoi(rrd.stat_head->version);
331 gettimeofday(&tmp_time, 0);
332 normalize_time(&tmp_time);
333 current_time = tmp_time.tv_sec;
334 if (version >= 3) {
335 current_time_usec = tmp_time.tv_usec;
336 } else {
337 current_time_usec = 0;
338 }
340 rra_current = rra_start = rra_begin = rrd_file->header_len;
341 /* This is defined in the ANSI C standard, section 7.9.5.3:
343 When a file is opened with udpate mode ('+' as the second
344 or third character in the ... list of mode argument
345 variables), both input and output may be performed on the
346 associated stream. However, ... input may not be directly
347 followed by output without an intervening call to a file
348 positioning function, unless the input operation encounters
349 end-of-file. */
350 #if 0 //def HAVE_MMAP
351 rrd_filesize = rrd_file->file_size;
352 fseek(rrd_file->fd, 0, SEEK_END);
353 rrd_filesize = ftell(rrd_file->fd);
354 fseek(rrd_file->fd, rra_current, SEEK_SET);
355 #else
356 // fseek(rrd_file->fd, 0, SEEK_CUR);
357 #endif
360 /* get exclusive lock to whole file.
361 * lock gets removed when we close the file.
362 */
363 if (LockRRD(rrd_file->fd) != 0) {
364 rrd_set_error("could not lock RRD");
365 goto err_close;
366 }
368 if ((updvals =
369 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
370 rrd_set_error("allocating updvals pointer array");
371 goto err_close;
372 }
374 if ((pdp_temp = malloc(sizeof(rrd_value_t)
375 * rrd.stat_head->ds_cnt)) == NULL) {
376 rrd_set_error("allocating pdp_temp ...");
377 goto err_free_updvals;
378 }
380 if ((tmpl_idx = malloc(sizeof(unsigned long)
381 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
382 rrd_set_error("allocating tmpl_idx ...");
383 goto err_free_pdp_temp;
384 }
385 /* initialize tmplt redirector */
386 /* default config example (assume DS 1 is a CDEF DS)
387 tmpl_idx[0] -> 0; (time)
388 tmpl_idx[1] -> 1; (DS 0)
389 tmpl_idx[2] -> 3; (DS 2)
390 tmpl_idx[3] -> 4; (DS 3) */
391 tmpl_idx[0] = 0; /* time */
392 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
393 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
394 tmpl_idx[ii++] = i;
395 }
396 tmpl_cnt = ii;
398 if (tmplt) {
399 /* we should work on a writeable copy here */
400 char *dsname;
401 unsigned int tmpl_len;
402 char *tmplt_copy = strdup(tmplt);
404 dsname = tmplt_copy;
405 tmpl_cnt = 1; /* the first entry is the time */
406 tmpl_len = strlen(tmplt_copy);
407 for (i = 0; i <= tmpl_len; i++) {
408 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
409 tmplt_copy[i] = '\0';
410 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
411 rrd_set_error
412 ("tmplt contains more DS definitions than RRD");
413 goto err_free_tmpl_idx;
414 }
415 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
416 rrd_set_error("unknown DS name '%s'", dsname);
417 goto err_free_tmpl_idx;
418 } else {
419 /* the first element is always the time */
420 tmpl_idx[tmpl_cnt - 1]++;
421 /* go to the next entry on the tmplt_copy */
422 dsname = &tmplt_copy[i + 1];
423 /* fix the damage we did before */
424 if (i < tmpl_len) {
425 tmplt_copy[i] = ':';
426 }
428 }
429 }
430 }
431 free(tmplt_copy);
432 }
433 if ((pdp_new = malloc(sizeof(rrd_value_t)
434 * rrd.stat_head->ds_cnt)) == NULL) {
435 rrd_set_error("allocating pdp_new ...");
436 goto err_free_tmpl_idx;
437 }
438 #if 0 //def HAVE_MMAP
439 rrd_mmaped_file = mmap(0,
440 rrd_file->file_len,
441 PROT_READ | PROT_WRITE,
442 MAP_SHARED, fileno(in_file), 0);
443 if (rrd_mmaped_file == MAP_FAILED) {
444 rrd_set_error("error mmapping file %s", filename);
445 free(updvals);
446 free(pdp_temp);
447 free(tmpl_idx);
448 rrd_free(&rrd);
449 rrd_close(rrd_file);
450 return (-1);
451 }
452 #ifdef USE_MADVISE
453 /* when we use mmaping we tell the kernel the mmap equivalent
454 of POSIX_FADV_RANDOM */
455 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
456 #endif
457 #endif
458 /* loop through the arguments. */
459 for (arg_i = 0; arg_i < argc; arg_i++) {
460 char *stepper = strdup(argv[arg_i]);
461 char *step_start = stepper;
462 char *p;
463 char *parsetime_error = NULL;
464 enum { atstyle, normal } timesyntax;
465 struct rrd_time_value ds_tv;
467 if (stepper == NULL) {
468 rrd_set_error("failed duplication argv entry");
469 free(step_start);
470 goto err_free_pdp_new;
471 }
472 /* initialize all ds input to unknown except the first one
473 which has always got to be set */
474 memset(updvals + 1, 'U', rrd.stat_head->ds_cnt);
475 updvals[0] = stepper;
476 /* separate all ds elements; first must be examined separately
477 due to alternate time syntax */
478 if ((p = strchr(stepper, '@')) != NULL) {
479 timesyntax = atstyle;
480 *p = '\0';
481 stepper = p + 1;
482 } else if ((p = strchr(stepper, ':')) != NULL) {
483 timesyntax = normal;
484 *p = '\0';
485 stepper = p + 1;
486 } else {
487 rrd_set_error
488 ("expected timestamp not found in data source from %s",
489 argv[arg_i]);
490 free(step_start);
491 break;
492 }
493 ii = 1;
494 updvals[tmpl_idx[ii]] = stepper;
495 while (*stepper) {
496 if (*stepper == ':') {
497 *stepper = '\0';
498 ii++;
499 if (ii < tmpl_cnt) {
500 updvals[tmpl_idx[ii]] = stepper + 1;
501 }
502 }
503 stepper++;
504 }
506 if (ii != tmpl_cnt - 1) {
507 rrd_set_error
508 ("expected %lu data source readings (got %lu) from %s",
509 tmpl_cnt - 1, ii, argv[arg_i]);
510 free(step_start);
511 break;
512 }
514 /* get the time from the reading ... handle N */
515 if (timesyntax == atstyle) {
516 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
517 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
518 free(step_start);
519 break;
520 }
521 if (ds_tv.type == RELATIVE_TO_END_TIME ||
522 ds_tv.type == RELATIVE_TO_START_TIME) {
523 rrd_set_error("specifying time relative to the 'start' "
524 "or 'end' makes no sense here: %s", updvals[0]);
525 free(step_start);
526 break;
527 }
529 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
531 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
533 } else if (strcmp(updvals[0], "N") == 0) {
534 gettimeofday(&tmp_time, 0);
535 normalize_time(&tmp_time);
536 current_time = tmp_time.tv_sec;
537 current_time_usec = tmp_time.tv_usec;
538 } else {
539 double tmp;
541 tmp = strtod(updvals[0], 0);
542 current_time = floor(tmp);
543 current_time_usec =
544 (long) ((tmp - (double) current_time) * 1000000.0);
545 }
546 /* dont do any correction for old version RRDs */
547 if (version < 3)
548 current_time_usec = 0;
550 if (current_time < rrd.live_head->last_up ||
551 (current_time == rrd.live_head->last_up &&
552 (long) current_time_usec <=
553 (long) rrd.live_head->last_up_usec)) {
554 rrd_set_error("illegal attempt to update using time %ld when "
555 "last update time is %ld (minimum one second step)",
556 current_time, rrd.live_head->last_up);
557 free(step_start);
558 break;
559 }
562 /* seek to the beginning of the rra's */
563 if (rra_current != rra_begin) {
564 #ifndef HAVE_MMAP
565 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
566 rrd_set_error("seek error in rrd");
567 free(step_start);
568 break;
569 }
570 #endif
571 rra_current = rra_begin;
572 }
573 rra_start = rra_begin;
575 /* when was the current pdp started */
576 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
577 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
579 /* when did the last pdp_st occur */
580 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
581 occu_pdp_st = current_time - occu_pdp_age;
583 /* interval = current_time - rrd.live_head->last_up; */
584 interval = (double) (current_time - rrd.live_head->last_up)
585 + (double) ((long) current_time_usec -
586 (long) rrd.live_head->last_up_usec) / 1000000.0;
588 if (occu_pdp_st > proc_pdp_st) {
589 /* OK we passed the pdp_st moment */
590 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
591 * occurred before the latest
592 * pdp_st moment*/
593 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
594 post_int = occu_pdp_age; /* how much after it */
595 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
596 } else {
597 pre_int = interval;
598 post_int = 0;
599 }
601 #ifdef DEBUG
602 printf("proc_pdp_age %lu\t"
603 "proc_pdp_st %lu\t"
604 "occu_pfp_age %lu\t"
605 "occu_pdp_st %lu\t"
606 "int %lf\t"
607 "pre_int %lf\t"
608 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
609 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
610 #endif
612 /* process the data sources and update the pdp_prep
613 * area accordingly */
614 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
615 enum dst_en dst_idx;
617 dst_idx = dst_conv(rrd.ds_def[i].dst);
619 /* make sure we do not build diffs with old last_ds values */
620 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
621 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
622 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
623 }
625 /* NOTE: DST_CDEF should never enter this if block, because
626 * updvals[i+1][0] is initialized to 'U'; unless the caller
627 * accidently specified a value for the DST_CDEF. To handle
628 * this case, an extra check is required. */
630 if ((updvals[i + 1][0] != 'U') &&
631 (dst_idx != DST_CDEF) &&
632 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
633 double rate = DNAN;
635 /* the data source type defines how to process the data */
636 /* pdp_new contains rate * time ... eg the bytes
637 * transferred during the interval. Doing it this way saves
638 * a lot of math operations */
639 switch (dst_idx) {
640 case DST_COUNTER:
641 case DST_DERIVE:
642 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
643 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
644 if ((updvals[i + 1][ii] < '0'
645 || updvals[i + 1][ii] > '9') && (ii != 0
646 && updvals[i
647 +
648 1]
649 [ii] !=
650 '-')) {
651 rrd_set_error("not a simple integer: '%s'",
652 updvals[i + 1]);
653 break;
654 }
655 }
656 if (rrd_test_error()) {
657 break;
658 }
659 pdp_new[i] =
660 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
661 if (dst_idx == DST_COUNTER) {
662 /* simple overflow catcher suggested by Andres Kroonmaa */
663 /* this will fail terribly for non 32 or 64 bit counters ... */
664 /* are there any others in SNMP land ? */
665 if (pdp_new[i] < (double) 0.0)
666 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
667 if (pdp_new[i] < (double) 0.0)
668 pdp_new[i] += (double) 18446744069414584320.0;
669 /* 2^64-2^32 */ ;
670 }
671 rate = pdp_new[i] / interval;
672 } else {
673 pdp_new[i] = DNAN;
674 }
675 break;
676 case DST_ABSOLUTE:
677 errno = 0;
678 pdp_new[i] = strtod(updvals[i + 1], &endptr);
679 if (errno > 0) {
680 rrd_set_error("converting '%s' to float: %s",
681 updvals[i + 1], rrd_strerror(errno));
682 break;
683 };
684 if (endptr[0] != '\0') {
685 rrd_set_error
686 ("conversion of '%s' to float not complete: tail '%s'",
687 updvals[i + 1], endptr);
688 break;
689 }
690 rate = pdp_new[i] / interval;
691 break;
692 case DST_GAUGE:
693 errno = 0;
694 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
695 if (errno > 0) {
696 rrd_set_error("converting '%s' to float: %s",
697 updvals[i + 1], rrd_strerror(errno));
698 break;
699 };
700 if (endptr[0] != '\0') {
701 rrd_set_error
702 ("conversion of '%s' to float not complete: tail '%s'",
703 updvals[i + 1], endptr);
704 break;
705 }
706 rate = pdp_new[i] / interval;
707 break;
708 default:
709 rrd_set_error("rrd contains unknown DS type : '%s'",
710 rrd.ds_def[i].dst);
711 break;
712 }
713 /* break out of this for loop if the error string is set */
714 if (rrd_test_error()) {
715 break;
716 }
717 /* make sure pdp_temp is neither too large or too small
718 * if any of these occur it becomes unknown ...
719 * sorry folks ... */
720 if (!isnan(rate) &&
721 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
722 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
723 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
724 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
725 pdp_new[i] = DNAN;
726 }
727 } else {
728 /* no news is news all the same */
729 pdp_new[i] = DNAN;
730 }
733 /* make a copy of the command line argument for the next run */
734 #ifdef DEBUG
735 fprintf(stderr,
736 "prep ds[%lu]\t"
737 "last_arg '%s'\t"
738 "this_arg '%s'\t"
739 "pdp_new %10.2f\n",
740 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
741 #endif
742 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
743 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
744 }
745 /* break out of the argument parsing loop if the error_string is set */
746 if (rrd_test_error()) {
747 free(step_start);
748 break;
749 }
750 /* has a pdp_st moment occurred since the last run ? */
752 if (proc_pdp_st == occu_pdp_st) {
753 /* no we have not passed a pdp_st moment. therefore update is simple */
755 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
756 if (isnan(pdp_new[i])) {
757 /* this is not realy accurate if we use subsecond data arival time
758 should have thought of it when going subsecond resolution ...
759 sorry next format change we will have it! */
760 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
761 floor(interval);
762 } else {
763 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
764 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
765 } else {
766 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
767 }
768 }
769 #ifdef DEBUG
770 fprintf(stderr,
771 "NO PDP ds[%lu]\t"
772 "value %10.2f\t"
773 "unkn_sec %5lu\n",
774 i,
775 rrd.pdp_prep[i].scratch[PDP_val].u_val,
776 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
777 #endif
778 }
779 } else {
780 /* an pdp_st has occurred. */
782 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
783 rate*seconds which occurred up to the last run.
784 pdp_new[] contains rate*seconds from the latest run.
785 pdp_temp[] will contain the rate for cdp */
787 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
788 /* update pdp_prep to the current pdp_st. */
789 double pre_unknown = 0.0;
791 if (isnan(pdp_new[i])) {
792 /* a final bit of unkonwn to be added bevore calculation
793 we use a temporary variable for this so that we
794 don't have to turn integer lines before using the value */
795 pre_unknown = pre_int;
796 } else {
797 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
798 rrd.pdp_prep[i].scratch[PDP_val].u_val =
799 pdp_new[i] / interval * pre_int;
800 } else {
801 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
802 pdp_new[i] / interval * pre_int;
803 }
804 }
807 /* if too much of the pdp_prep is unknown we dump it */
808 if (
809 /* removed because this does not agree with the
810 definition that a heartbeat can be unknown */
811 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
812 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
813 /* if the interval is larger thatn mrhb we get NAN */
814 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
815 (occu_pdp_st - proc_pdp_st <=
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
817 pdp_temp[i] = DNAN;
818 } else {
819 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
820 / ((double) (occu_pdp_st - proc_pdp_st
821 -
822 rrd.pdp_prep[i].
823 scratch[PDP_unkn_sec_cnt].u_cnt)
824 - pre_unknown);
825 }
827 /* process CDEF data sources; remember each CDEF DS can
828 * only reference other DS with a lower index number */
829 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
830 rpnp_t *rpnp;
832 rpnp =
833 rpn_expand((rpn_cdefds_t *) &
834 (rrd.ds_def[i].par[DS_cdef]));
835 /* substitue data values for OP_VARIABLE nodes */
836 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
837 if (rpnp[ii].op == OP_VARIABLE) {
838 rpnp[ii].op = OP_NUMBER;
839 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
840 }
841 }
842 /* run the rpn calculator */
843 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
844 free(rpnp);
845 break; /* exits the data sources pdp_temp loop */
846 }
847 }
849 /* make pdp_prep ready for the next run */
850 if (isnan(pdp_new[i])) {
851 /* this is not realy accurate if we use subsecond data arival time
852 should have thought of it when going subsecond resolution ...
853 sorry next format change we will have it! */
854 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
855 floor(post_int);
856 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
857 } else {
858 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
859 rrd.pdp_prep[i].scratch[PDP_val].u_val =
860 pdp_new[i] / interval * post_int;
861 }
863 #ifdef DEBUG
864 fprintf(stderr,
865 "PDP UPD ds[%lu]\t"
866 "pdp_temp %10.2f\t"
867 "new_prep %10.2f\t"
868 "new_unkn_sec %5lu\n",
869 i, pdp_temp[i],
870 rrd.pdp_prep[i].scratch[PDP_val].u_val,
871 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
872 #endif
873 }
875 /* if there were errors during the last loop, bail out here */
876 if (rrd_test_error()) {
877 free(step_start);
878 break;
879 }
881 /* compute the number of elapsed pdp_st moments */
882 elapsed_pdp_st =
883 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
884 #ifdef DEBUG
885 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
886 #endif
887 if (rra_step_cnt == NULL) {
888 rra_step_cnt = (unsigned long *)
889 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
890 }
892 for (i = 0, rra_start = rra_begin;
893 i < rrd.stat_head->rra_cnt;
894 rra_start +=
895 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
896 sizeof(rrd_value_t), i++) {
897 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
898 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
899 (proc_pdp_st / rrd.stat_head->pdp_step) %
900 rrd.rra_def[i].pdp_cnt;
901 if (start_pdp_offset <= elapsed_pdp_st) {
902 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
903 rrd.rra_def[i].pdp_cnt + 1;
904 } else {
905 rra_step_cnt[i] = 0;
906 }
908 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
909 /* If this is a bulk update, we need to skip ahead in
910 the seasonal arrays so that they will be correct for
911 the next observed value;
912 note that for the bulk update itself, no update will
913 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
914 and DEVPREDICT will be set to DNAN. */
915 if (rra_step_cnt[i] > 2) {
916 /* skip update by resetting rra_step_cnt[i],
917 note that this is not data source specific; this is
918 due to the bulk update, not a DNAN value for the
919 specific data source. */
920 rra_step_cnt[i] = 0;
921 lookup_seasonal(&rrd, i, rra_start, rrd_file,
922 elapsed_pdp_st, &last_seasonal_coef);
923 lookup_seasonal(&rrd, i, rra_start, rrd_file,
924 elapsed_pdp_st + 1, &seasonal_coef);
925 }
927 /* periodically run a smoother for seasonal effects */
928 /* Need to use first cdp parameter buffer to track
929 * burnin (burnin requires a specific smoothing schedule).
930 * The CDP_init_seasonal parameter is really an RRA level,
931 * not a data source within RRA level parameter, but the rra_def
932 * is read only for rrd_update (not flushed to disk). */
933 iii = i * (rrd.stat_head->ds_cnt);
934 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
935 <= BURNIN_CYCLES) {
936 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
937 > rrd.rra_def[i].row_cnt - 1) {
938 /* mark off one of the burnin cycles */
939 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
940 u_cnt);
941 schedule_smooth = 1;
942 }
943 } else {
944 /* someone has no doubt invented a trick to deal with this
945 * wrap around, but at least this code is clear. */
946 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
947 u_cnt > rrd.rra_ptr[i].cur_row) {
948 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
949 * mapping between PDP and CDP */
950 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
951 >=
952 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
953 u_cnt) {
954 #ifdef DEBUG
955 fprintf(stderr,
956 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
957 rrd.rra_ptr[i].cur_row,
958 elapsed_pdp_st,
959 rrd.rra_def[i].
960 par[RRA_seasonal_smooth_idx].u_cnt);
961 #endif
962 schedule_smooth = 1;
963 }
964 } else {
965 /* can't rely on negative numbers because we are working with
966 * unsigned values */
967 /* Don't need modulus here. If we've wrapped more than once, only
968 * one smooth is executed at the end. */
969 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
970 rrd.rra_def[i].row_cnt
971 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
972 rrd.rra_def[i].row_cnt >=
973 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
974 u_cnt) {
975 #ifdef DEBUG
976 fprintf(stderr,
977 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
978 rrd.rra_ptr[i].cur_row,
979 elapsed_pdp_st,
980 rrd.rra_def[i].
981 par[RRA_seasonal_smooth_idx].u_cnt);
982 #endif
983 schedule_smooth = 1;
984 }
985 }
986 }
988 rra_current = rrd_tell(rrd_file);
989 }
990 /* if cf is DEVSEASONAL or SEASONAL */
991 if (rrd_test_error())
992 break;
994 /* update CDP_PREP areas */
995 /* loop over data soures within each RRA */
996 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
998 /* iii indexes the CDP prep area for this data source within the RRA */
999 iii = i * rrd.stat_head->ds_cnt + ii;
1001 if (rrd.rra_def[i].pdp_cnt > 1) {
1003 if (rra_step_cnt[i] > 0) {
1004 /* If we are in this block, as least 1 CDP value will be written to
1005 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1006 * to be written, then the "fill in" value is the CDP_secondary_val
1007 * entry. */
1008 if (isnan(pdp_temp[ii])) {
1009 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1010 u_cnt += start_pdp_offset;
1011 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1012 u_val = DNAN;
1013 } else {
1014 /* CDP_secondary value is the RRA "fill in" value for intermediary
1015 * CDP data entries. No matter the CF, the value is the same because
1016 * the average, max, min, and last of a list of identical values is
1017 * the same, namely, the value itself. */
1018 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1019 u_val = pdp_temp[ii];
1020 }
1022 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1023 u_cnt >
1024 rrd.rra_def[i].pdp_cnt *
1025 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1026 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1027 u_val = DNAN;
1028 /* initialize carry over */
1029 if (current_cf == CF_AVERAGE) {
1030 if (isnan(pdp_temp[ii])) {
1031 rrd.cdp_prep[iii].scratch[CDP_val].
1032 u_val = DNAN;
1033 } else {
1034 rrd.cdp_prep[iii].scratch[CDP_val].
1035 u_val =
1036 pdp_temp[ii] *
1037 ((elapsed_pdp_st -
1038 start_pdp_offset) %
1039 rrd.rra_def[i].pdp_cnt);
1040 }
1041 } else {
1042 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1043 pdp_temp[ii];
1044 }
1045 } else {
1046 rrd_value_t cum_val, cur_val;
1048 switch (current_cf) {
1049 case CF_AVERAGE:
1050 cum_val =
1051 IFDNAN(rrd.cdp_prep[iii].
1052 scratch[CDP_val].u_val, 0.0);
1053 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1054 rrd.cdp_prep[iii].
1055 scratch[CDP_primary_val].u_val =
1056 (cum_val +
1057 cur_val * start_pdp_offset) /
1058 (rrd.rra_def[i].pdp_cnt -
1059 rrd.cdp_prep[iii].
1060 scratch[CDP_unkn_pdp_cnt].u_cnt);
1061 /* initialize carry over value */
1062 if (isnan(pdp_temp[ii])) {
1063 rrd.cdp_prep[iii].scratch[CDP_val].
1064 u_val = DNAN;
1065 } else {
1066 rrd.cdp_prep[iii].scratch[CDP_val].
1067 u_val =
1068 pdp_temp[ii] *
1069 ((elapsed_pdp_st -
1070 start_pdp_offset) %
1071 rrd.rra_def[i].pdp_cnt);
1072 }
1073 break;
1074 case CF_MAXIMUM:
1075 cum_val =
1076 IFDNAN(rrd.cdp_prep[iii].
1077 scratch[CDP_val].u_val, -DINF);
1078 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1079 #ifdef DEBUG
1080 if (isnan
1081 (rrd.cdp_prep[iii].scratch[CDP_val].
1082 u_val) && isnan(pdp_temp[ii])) {
1083 fprintf(stderr,
1084 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1085 i, ii);
1086 exit(-1);
1087 }
1088 #endif
1089 if (cur_val > cum_val)
1090 rrd.cdp_prep[iii].
1091 scratch[CDP_primary_val].u_val =
1092 cur_val;
1093 else
1094 rrd.cdp_prep[iii].
1095 scratch[CDP_primary_val].u_val =
1096 cum_val;
1097 /* initialize carry over value */
1098 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1099 pdp_temp[ii];
1100 break;
1101 case CF_MINIMUM:
1102 cum_val =
1103 IFDNAN(rrd.cdp_prep[iii].
1104 scratch[CDP_val].u_val, DINF);
1105 cur_val = IFDNAN(pdp_temp[ii], DINF);
1106 #ifdef DEBUG
1107 if (isnan
1108 (rrd.cdp_prep[iii].scratch[CDP_val].
1109 u_val) && isnan(pdp_temp[ii])) {
1110 fprintf(stderr,
1111 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1112 i, ii);
1113 exit(-1);
1114 }
1115 #endif
1116 if (cur_val < cum_val)
1117 rrd.cdp_prep[iii].
1118 scratch[CDP_primary_val].u_val =
1119 cur_val;
1120 else
1121 rrd.cdp_prep[iii].
1122 scratch[CDP_primary_val].u_val =
1123 cum_val;
1124 /* initialize carry over value */
1125 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1126 pdp_temp[ii];
1127 break;
1128 case CF_LAST:
1129 default:
1130 rrd.cdp_prep[iii].
1131 scratch[CDP_primary_val].u_val =
1132 pdp_temp[ii];
1133 /* initialize carry over value */
1134 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1135 pdp_temp[ii];
1136 break;
1137 }
1138 } /* endif meets xff value requirement for a valid value */
1139 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1140 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1141 if (isnan(pdp_temp[ii]))
1142 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1143 u_cnt =
1144 (elapsed_pdp_st -
1145 start_pdp_offset) %
1146 rrd.rra_def[i].pdp_cnt;
1147 else
1148 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1149 u_cnt = 0;
1150 } else { /* rra_step_cnt[i] == 0 */
1152 #ifdef DEBUG
1153 if (isnan
1154 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1155 fprintf(stderr,
1156 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1157 i, ii);
1158 } else {
1159 fprintf(stderr,
1160 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1161 i, ii,
1162 rrd.cdp_prep[iii].scratch[CDP_val].
1163 u_val);
1164 }
1165 #endif
1166 if (isnan(pdp_temp[ii])) {
1167 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1168 u_cnt += elapsed_pdp_st;
1169 } else
1170 if (isnan
1171 (rrd.cdp_prep[iii].scratch[CDP_val].
1172 u_val)) {
1173 if (current_cf == CF_AVERAGE) {
1174 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1175 pdp_temp[ii] * elapsed_pdp_st;
1176 } else {
1177 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1178 pdp_temp[ii];
1179 }
1180 #ifdef DEBUG
1181 fprintf(stderr,
1182 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1183 i, ii,
1184 rrd.cdp_prep[iii].scratch[CDP_val].
1185 u_val);
1186 #endif
1187 } else {
1188 switch (current_cf) {
1189 case CF_AVERAGE:
1190 rrd.cdp_prep[iii].scratch[CDP_val].
1191 u_val +=
1192 pdp_temp[ii] * elapsed_pdp_st;
1193 break;
1194 case CF_MINIMUM:
1195 if (pdp_temp[ii] <
1196 rrd.cdp_prep[iii].scratch[CDP_val].
1197 u_val)
1198 rrd.cdp_prep[iii].scratch[CDP_val].
1199 u_val = pdp_temp[ii];
1200 break;
1201 case CF_MAXIMUM:
1202 if (pdp_temp[ii] >
1203 rrd.cdp_prep[iii].scratch[CDP_val].
1204 u_val)
1205 rrd.cdp_prep[iii].scratch[CDP_val].
1206 u_val = pdp_temp[ii];
1207 break;
1208 case CF_LAST:
1209 default:
1210 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1211 pdp_temp[ii];
1212 break;
1213 }
1214 }
1215 }
1216 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1217 if (elapsed_pdp_st > 2) {
1218 switch (current_cf) {
1219 case CF_AVERAGE:
1220 default:
1221 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1222 u_val = pdp_temp[ii];
1223 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1224 u_val = pdp_temp[ii];
1225 break;
1226 case CF_SEASONAL:
1227 case CF_DEVSEASONAL:
1228 /* need to update cached seasonal values, so they are consistent
1229 * with the bulk update */
1230 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1231 * CDP_last_deviation are the same. */
1232 rrd.cdp_prep[iii].
1233 scratch[CDP_hw_last_seasonal].u_val =
1234 last_seasonal_coef[ii];
1235 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1236 u_val = seasonal_coef[ii];
1237 break;
1238 case CF_HWPREDICT:
1239 /* need to update the null_count and last_null_count.
1240 * even do this for non-DNAN pdp_temp because the
1241 * algorithm is not learning from batch updates. */
1242 rrd.cdp_prep[iii].scratch[CDP_null_count].
1243 u_cnt += elapsed_pdp_st;
1244 rrd.cdp_prep[iii].
1245 scratch[CDP_last_null_count].u_cnt +=
1246 elapsed_pdp_st - 1;
1247 /* fall through */
1248 case CF_DEVPREDICT:
1249 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1250 u_val = DNAN;
1251 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1252 u_val = DNAN;
1253 break;
1254 case CF_FAILURES:
1255 /* do not count missed bulk values as failures */
1256 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1257 u_val = 0;
1258 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1259 u_val = 0;
1260 /* need to reset violations buffer.
1261 * could do this more carefully, but for now, just
1262 * assume a bulk update wipes away all violations. */
1263 erase_violations(&rrd, iii, i);
1264 break;
1265 }
1266 }
1267 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1269 if (rrd_test_error())
1270 break;
1272 } /* endif data sources loop */
1273 } /* end RRA Loop */
1275 /* this loop is only entered if elapsed_pdp_st < 3 */
1276 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1277 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1278 for (i = 0, rra_start = rra_begin;
1279 i < rrd.stat_head->rra_cnt;
1280 rra_start +=
1281 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1282 sizeof(rrd_value_t), i++) {
1283 if (rrd.rra_def[i].pdp_cnt > 1)
1284 continue;
1286 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1287 if (current_cf == CF_SEASONAL
1288 || current_cf == CF_DEVSEASONAL) {
1289 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1290 elapsed_pdp_st + (scratch_idx ==
1291 CDP_primary_val ? 1
1292 : 2),
1293 &seasonal_coef);
1294 rra_current = rrd_tell(rrd_file);
1295 }
1296 if (rrd_test_error())
1297 break;
1298 /* loop over data soures within each RRA */
1299 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1300 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1301 i * (rrd.stat_head->ds_cnt) + ii,
1302 i, ii, scratch_idx, seasonal_coef);
1303 }
1304 } /* end RRA Loop */
1305 if (rrd_test_error())
1306 break;
1307 } /* end elapsed_pdp_st loop */
1309 if (rrd_test_error())
1310 break;
1312 /* Ready to write to disk */
1313 /* Move sequentially through the file, writing one RRA at a time.
1314 * Note this architecture divorces the computation of CDP with
1315 * flushing updated RRA entries to disk. */
1316 for (i = 0, rra_start = rra_begin;
1317 i < rrd.stat_head->rra_cnt;
1318 rra_start +=
1319 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1320 sizeof(rrd_value_t), i++) {
1321 /* is th5Aere anything to write for this RRA? If not, continue. */
1322 if (rra_step_cnt[i] == 0)
1323 continue;
1325 /* write the first row */
1326 #ifdef DEBUG
1327 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1328 #endif
1329 rrd.rra_ptr[i].cur_row++;
1330 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1331 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1332 /* positition on the first row */
1333 rra_pos_tmp = rra_start +
1334 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1335 sizeof(rrd_value_t);
1336 if (rra_pos_tmp != rra_current) {
1337 #ifndef HAVE_MMAP
1338 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1339 rrd_set_error("seek error in rrd");
1340 break;
1341 }
1342 #endif
1343 rra_current = rra_pos_tmp;
1344 }
1345 #ifdef DEBUG
1346 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1347 #endif
1348 scratch_idx = CDP_primary_val;
1349 if (pcdp_summary != NULL) {
1350 rra_time = (current_time - current_time
1351 % (rrd.rra_def[i].pdp_cnt *
1352 rrd.stat_head->pdp_step))
1353 -
1354 ((rra_step_cnt[i] -
1355 1) * rrd.rra_def[i].pdp_cnt *
1356 rrd.stat_head->pdp_step);
1357 }
1358 pcdp_summary =
1359 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1360 scratch_idx, pcdp_summary, &rra_time);
1361 if (rrd_test_error())
1362 break;
1364 /* write other rows of the bulk update, if any */
1365 scratch_idx = CDP_secondary_val;
1366 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1367 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1368 #ifdef DEBUG
1369 fprintf(stderr,
1370 "Wraparound for RRA %s, %lu updates left\n",
1371 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1372 #endif
1373 /* wrap */
1374 rrd.rra_ptr[i].cur_row = 0;
1375 /* seek back to beginning of current rra */
1376 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1377 rrd_set_error("seek error in rrd");
1378 break;
1379 }
1380 #ifdef DEBUG
1381 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1382 rrd_file->pos);
1383 #endif
1384 rra_current = rra_start;
1385 }
1386 if (pcdp_summary != NULL) {
1387 rra_time = (current_time - current_time
1388 % (rrd.rra_def[i].pdp_cnt *
1389 rrd.stat_head->pdp_step))
1390 -
1391 ((rra_step_cnt[i] -
1392 2) * rrd.rra_def[i].pdp_cnt *
1393 rrd.stat_head->pdp_step);
1394 }
1395 pcdp_summary =
1396 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1397 scratch_idx, pcdp_summary, &rra_time);
1398 }
1400 if (rrd_test_error())
1401 break;
1402 } /* RRA LOOP */
1404 /* break out of the argument parsing loop if error_string is set */
1405 if (rrd_test_error()) {
1406 free(step_start);
1407 break;
1408 }
1410 } /* endif a pdp_st has occurred */
1411 rrd.live_head->last_up = current_time;
1412 rrd.live_head->last_up_usec = current_time_usec;
1413 free(step_start);
1414 } /* function argument loop */
1416 if (seasonal_coef != NULL)
1417 free(seasonal_coef);
1418 if (last_seasonal_coef != NULL)
1419 free(last_seasonal_coef);
1420 if (rra_step_cnt != NULL)
1421 free(rra_step_cnt);
1422 rpnstack_free(&rpnstack);
1424 #if 0 //def HAVE_MMAP
1425 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1426 rrd_set_error("error writing(unmapping) file: %s", filename);
1427 }
1428 #else
1429 //rrd_flush(rrd_file); //XXX: really needed?
1430 #endif
1431 /* if we got here and if there is an error and if the file has not been
1432 * written to, then close things up and return. */
1433 if (rrd_test_error()) {
1434 goto err_free_pdp_new;
1435 }
1437 /* aargh ... that was tough ... so many loops ... anyway, its done.
1438 * we just need to write back the live header portion now*/
1440 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1441 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1442 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1443 SEEK_SET) != 0) {
1444 rrd_set_error("seek rrd for live header writeback");
1445 goto err_free_pdp_new;
1446 }
1448 if (version >= 3) {
1449 if (rrd_write(rrd_file, rrd.live_head,
1450 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1451 rrd_set_error("rrd_write live_head to rrd");
1452 goto err_free_pdp_new;
1453 }
1454 } else {
1455 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1456 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1457 rrd_set_error("rrd_write live_head to rrd");
1458 goto err_free_pdp_new;
1459 }
1460 }
1463 if (rrd_write(rrd_file, rrd.pdp_prep,
1464 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1465 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1466 rrd_set_error("rrd_write pdp_prep to rrd");
1467 goto err_free_pdp_new;
1468 }
1470 if (rrd_write(rrd_file, rrd.cdp_prep,
1471 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1472 rrd.stat_head->ds_cnt)
1473 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1474 rrd.stat_head->ds_cnt)) {
1476 rrd_set_error("rrd_write cdp_prep to rrd");
1477 goto err_free_pdp_new;
1478 }
1480 if (rrd_write(rrd_file, rrd.rra_ptr,
1481 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1482 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1483 rrd_set_error("rrd_write rra_ptr to rrd");
1484 goto err_free_pdp_new;
1485 }
1486 #ifdef HAVE_POSIX_FADVISExxx
1488 /* with update we have write ops, so they will probably not be done by now, this means
1489 the buffers will not get freed. But calling this for the whole file - header
1490 will let the data off the hook as soon as it is written when if it is from a previous
1491 update cycle. Calling fdsync to force things is much too hard here. */
1493 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1494 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1495 rrd_strerror(errno));
1496 goto err_free_pdp_new;
1497 }
1498 #endif
1499 /* rrd_flush(rrd_file); */
1501 /* calling the smoothing code here guarantees at most
1502 * one smoothing operation per rrd_update call. Unfortunately,
1503 * it is possible with bulk updates, or a long-delayed update
1504 * for smoothing to occur off-schedule. This really isn't
1505 * critical except during the burning cycles. */
1506 if (schedule_smooth) {
1508 rra_start = rra_begin;
1509 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1510 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1511 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1512 #ifdef DEBUG
1513 fprintf(stderr, "Running smoother for rra %ld\n", i);
1514 #endif
1515 apply_smoother(&rrd, i, rra_start, rrd_file);
1516 if (rrd_test_error())
1517 break;
1518 }
1519 rra_start += rrd.rra_def[i].row_cnt
1520 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1521 }
1522 #ifdef HAVE_POSIX_FADVISExxx
1523 /* same procedure as above ... */
1524 if (0 !=
1525 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1526 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1527 rrd_strerror(errno));
1528 goto err_free_pdp_new;
1529 }
1530 #endif
1531 }
1533 rrd_free(&rrd);
1534 rrd_close(rrd_file);
1536 free(pdp_new);
1537 free(tmpl_idx);
1538 free(pdp_temp);
1539 free(updvals);
1540 return (0);
1542 err_free_pdp_new:
1543 free(pdp_new);
1544 err_free_tmpl_idx:
1545 free(tmpl_idx);
1546 err_free_pdp_temp:
1547 free(pdp_temp);
1548 err_free_updvals:
1549 free(updvals);
1550 err_close:
1551 rrd_close(rrd_file);
1552 err_free:
1553 rrd_free(&rrd);
1554 err_out:
1555 return (-1);
1556 }
1558 /*
1559 * get exclusive lock to whole file.
1560 * lock gets removed when we close the file
1561 *
1562 * returns 0 on success
1563 */
1564 int LockRRD(
1565 int in_file)
1566 {
1567 int rcstat;
1569 {
1570 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1571 struct _stat st;
1573 if (_fstat(in_file, &st) == 0) {
1574 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1575 } else {
1576 rcstat = -1;
1577 }
1578 #else
1579 struct flock lock;
1581 lock.l_type = F_WRLCK; /* exclusive write lock */
1582 lock.l_len = 0; /* whole file */
1583 lock.l_start = 0; /* start of file */
1584 lock.l_whence = SEEK_SET; /* end of file */
1586 rcstat = fcntl(in_file, F_SETLK, &lock);
1587 #endif
1588 }
1590 return (rcstat);
1591 }