1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
26 * replacement.
27 */
28 #include <sys/timeb.h>
30 #ifndef __MINGW32__
31 struct timeval {
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
34 };
35 #endif
37 struct __timezone {
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
40 };
42 static int gettimeofday(
43 struct timeval *t,
44 struct __timezone *tz)
45 {
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
54 return 0;
55 }
57 #endif
58 /*
59 * normilize time as returned by gettimeofday. usec part must
60 * be always >= 0
61 */
62 static void normalize_time(
63 struct timeval *t)
64 {
65 if (t->tv_usec < 0) {
66 t->tv_sec--;
67 t->tv_usec += 1000000L;
68 }
69 }
71 static info_t *write_RRA_row(
72 rrd_file_t *rrd_file,
73 rrd_t *rrd,
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
77 info_t *pcdp_summary,
78 time_t *rra_time)
79 {
80 unsigned long ds_idx, cdp_idx;
81 infoval iv;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96 *rra_time,
97 rrd->rra_def[rra_idx].
98 cf_nam,
99 rrd->rra_def[rra_idx].
100 pdp_cnt,
101 rrd->ds_def[ds_idx].
102 ds_nam), RD_I_VAL, iv);
103 }
104 if (rrd_write
105 (rrd_file,
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
108 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
109 return 0;
110 }
111 *rra_current += sizeof(rrd_value_t);
112 }
113 return (pcdp_summary);
114 }
116 int rrd_update_r(
117 const char *filename,
118 const char *tmplt,
119 int argc,
120 const char **argv);
121 int _rrd_update(
122 const char *filename,
123 const char *tmplt,
124 int argc,
125 const char **argv,
126 info_t *);
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
132 int argc,
133 char **argv)
134 {
135 char *tmplt = NULL;
136 info_t *result = NULL;
137 infoval rc;
139 rc.u_int = -1;
140 optind = 0;
141 opterr = 0; /* initialize getopt */
143 while (1) {
144 static struct option long_options[] = {
145 {"template", required_argument, 0, 't'},
146 {0, 0, 0, 0}
147 };
148 int option_index = 0;
149 int opt;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
153 if (opt == EOF)
154 break;
156 switch (opt) {
157 case 't':
158 tmplt = optarg;
159 break;
161 case '?':
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
163 goto end_tag;
164 }
165 }
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
170 goto end_tag;
171 }
172 rc.u_int = 0;
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
175 argc - optind - 1,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
178 end_tag:
179 return result;
180 }
182 int rrd_update(
183 int argc,
184 char **argv)
185 {
186 char *tmplt = NULL;
187 int rc;
189 optind = 0;
190 opterr = 0; /* initialize getopt */
192 while (1) {
193 static struct option long_options[] = {
194 {"template", required_argument, 0, 't'},
195 {0, 0, 0, 0}
196 };
197 int option_index = 0;
198 int opt;
200 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
202 if (opt == EOF)
203 break;
205 switch (opt) {
206 case 't':
207 tmplt = optarg;
208 break;
210 case '?':
211 rrd_set_error("unknown option '%s'", argv[optind - 1]);
212 return (-1);
213 }
214 }
216 /* need at least 2 arguments: filename, data. */
217 if (argc - optind < 2) {
218 rrd_set_error("Not enough arguments");
220 return -1;
221 }
223 rc = rrd_update_r(argv[optind], tmplt,
224 argc - optind - 1, (const char **) (argv + optind + 1));
225 return rc;
226 }
228 int rrd_update_r(
229 const char *filename,
230 const char *tmplt,
231 int argc,
232 const char **argv)
233 {
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
235 }
237 int _rrd_update(
238 const char *filename,
239 const char *tmplt,
240 int argc,
241 const char **argv,
242 info_t *pcdp_summary)
243 {
245 int arg_i = 2;
246 short j;
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
255 * processed. */
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
260 * the last run */
261 unsigned long proc_pdp_st; /* which pdp_st was the last
262 * to be processed */
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
265 * time */
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
270 * pdp_step time */
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
273 * existing entry */
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
276 * cdp values */
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
282 rrd_t rrd;
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
288 char **updvals;
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
299 * value. */
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
319 if (argc < 1) {
320 rrd_set_error("Not enough arguments");
321 return -1;
322 }
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
326 return -1;
327 }
329 /* initialize time */
330 version = atoi(rrd.stat_head->version);
331 gettimeofday(&tmp_time, 0);
332 normalize_time(&tmp_time);
333 current_time = tmp_time.tv_sec;
334 if (version >= 3) {
335 current_time_usec = tmp_time.tv_usec;
336 } else {
337 current_time_usec = 0;
338 }
340 rra_current = rra_start = rra_begin = rrd_file->header_len;
341 /* This is defined in the ANSI C standard, section 7.9.5.3:
343 When a file is opened with udpate mode ('+' as the second
344 or third character in the ... list of mode argument
345 variables), both input and output may be performed on the
346 associated stream. However, ... input may not be directly
347 followed by output without an intervening call to a file
348 positioning function, unless the input operation encounters
349 end-of-file. */
350 #if 0 //def HAVE_MMAP
351 rrd_filesize = rrd_file->file_size;
352 fseek(rrd_file->fd, 0, SEEK_END);
353 rrd_filesize = ftell(rrd_file->fd);
354 fseek(rrd_file->fd, rra_current, SEEK_SET);
355 #else
356 // fseek(rrd_file->fd, 0, SEEK_CUR);
357 #endif
360 /* get exclusive lock to whole file.
361 * lock gets removed when we close the file.
362 */
363 if (LockRRD(rrd_file->fd) != 0) {
364 rrd_set_error("could not lock RRD");
365 rrd_free(&rrd);
366 close(rrd_file->fd);
367 return (-1);
368 }
370 if ((updvals =
371 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
372 rrd_set_error("allocating updvals pointer array");
373 rrd_free(&rrd);
374 close(rrd_file->fd);
375 return (-1);
376 }
378 if ((pdp_temp = malloc(sizeof(rrd_value_t)
379 * rrd.stat_head->ds_cnt)) == NULL) {
380 rrd_set_error("allocating pdp_temp ...");
381 free(updvals);
382 rrd_free(&rrd);
383 close(rrd_file->fd);
384 return (-1);
385 }
387 if ((tmpl_idx = malloc(sizeof(unsigned long)
388 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
389 rrd_set_error("allocating tmpl_idx ...");
390 free(pdp_temp);
391 free(updvals);
392 rrd_free(&rrd);
393 close(rrd_file->fd);
394 return (-1);
395 }
396 /* initialize tmplt redirector */
397 /* default config example (assume DS 1 is a CDEF DS)
398 tmpl_idx[0] -> 0; (time)
399 tmpl_idx[1] -> 1; (DS 0)
400 tmpl_idx[2] -> 3; (DS 2)
401 tmpl_idx[3] -> 4; (DS 3) */
402 tmpl_idx[0] = 0; /* time */
403 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
404 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
405 tmpl_idx[ii++] = i;
406 }
407 tmpl_cnt = ii;
409 if (tmplt) {
410 /* we should work on a writeable copy here */
411 char *dsname;
412 unsigned int tmpl_len;
413 char *tmplt_copy = strdup(tmplt);
415 dsname = tmplt_copy;
416 tmpl_cnt = 1; /* the first entry is the time */
417 tmpl_len = strlen(tmplt_copy);
418 for (i = 0; i <= tmpl_len; i++) {
419 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
420 tmplt_copy[i] = '\0';
421 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
422 rrd_set_error
423 ("tmplt contains more DS definitions than RRD");
424 free(updvals);
425 free(pdp_temp);
426 free(tmpl_idx);
427 rrd_free(&rrd);
428 close(rrd_file->fd);
429 return (-1);
430 }
431 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
432 rrd_set_error("unknown DS name '%s'", dsname);
433 free(updvals);
434 free(pdp_temp);
435 free(tmplt_copy);
436 free(tmpl_idx);
437 rrd_free(&rrd);
438 close(rrd_file->fd);
439 return (-1);
440 } else {
441 /* the first element is always the time */
442 tmpl_idx[tmpl_cnt - 1]++;
443 /* go to the next entry on the tmplt_copy */
444 dsname = &tmplt_copy[i + 1];
445 /* fix the damage we did before */
446 if (i < tmpl_len) {
447 tmplt_copy[i] = ':';
448 }
450 }
451 }
452 }
453 free(tmplt_copy);
454 }
455 if ((pdp_new = malloc(sizeof(rrd_value_t)
456 * rrd.stat_head->ds_cnt)) == NULL) {
457 rrd_set_error("allocating pdp_new ...");
458 free(updvals);
459 free(pdp_temp);
460 free(tmpl_idx);
461 rrd_free(&rrd);
462 close(rrd_file->fd);
463 return (-1);
464 }
465 #if 0 //def HAVE_MMAP
466 rrd_mmaped_file = mmap(0,
467 rrd_file->file_len,
468 PROT_READ | PROT_WRITE,
469 MAP_SHARED, fileno(in_file), 0);
470 if (rrd_mmaped_file == MAP_FAILED) {
471 rrd_set_error("error mmapping file %s", filename);
472 free(updvals);
473 free(pdp_temp);
474 free(tmpl_idx);
475 rrd_free(&rrd);
476 close(rrd_file->fd);
477 return (-1);
478 }
479 #ifdef USE_MADVISE
480 /* when we use mmaping we tell the kernel the mmap equivalent
481 of POSIX_FADV_RANDOM */
482 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
483 #endif
484 #endif
485 /* loop through the arguments. */
486 for (arg_i = 0; arg_i < argc; arg_i++) {
487 char *stepper = strdup(argv[arg_i]);
488 char *step_start = stepper;
489 char *p;
490 char *parsetime_error = NULL;
491 enum { atstyle, normal } timesyntax;
492 struct rrd_time_value ds_tv;
494 if (stepper == NULL) {
495 rrd_set_error("failed duplication argv entry");
496 free(step_start);
497 free(updvals);
498 free(pdp_temp);
499 free(tmpl_idx);
500 rrd_free(&rrd);
501 #ifdef HAVE_MMAP
502 rrd_close(rrd_file);
503 #endif
504 close(rrd_file->fd);
505 return (-1);
506 }
507 /* initialize all ds input to unknown except the first one
508 which has always got to be set */
509 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
510 updvals[ii] = "U";
511 updvals[0] = stepper;
512 /* separate all ds elements; first must be examined separately
513 due to alternate time syntax */
514 if ((p = strchr(stepper, '@')) != NULL) {
515 timesyntax = atstyle;
516 *p = '\0';
517 stepper = p + 1;
518 } else if ((p = strchr(stepper, ':')) != NULL) {
519 timesyntax = normal;
520 *p = '\0';
521 stepper = p + 1;
522 } else {
523 rrd_set_error
524 ("expected timestamp not found in data source from %s",
525 argv[arg_i]);
526 free(step_start);
527 break;
528 }
529 ii = 1;
530 updvals[tmpl_idx[ii]] = stepper;
531 while (*stepper) {
532 if (*stepper == ':') {
533 *stepper = '\0';
534 ii++;
535 if (ii < tmpl_cnt) {
536 updvals[tmpl_idx[ii]] = stepper + 1;
537 }
538 }
539 stepper++;
540 }
542 if (ii != tmpl_cnt - 1) {
543 rrd_set_error
544 ("expected %lu data source readings (got %lu) from %s",
545 tmpl_cnt - 1, ii, argv[arg_i]);
546 free(step_start);
547 break;
548 }
550 /* get the time from the reading ... handle N */
551 if (timesyntax == atstyle) {
552 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
553 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
554 free(step_start);
555 break;
556 }
557 if (ds_tv.type == RELATIVE_TO_END_TIME ||
558 ds_tv.type == RELATIVE_TO_START_TIME) {
559 rrd_set_error("specifying time relative to the 'start' "
560 "or 'end' makes no sense here: %s", updvals[0]);
561 free(step_start);
562 break;
563 }
565 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
567 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
569 } else if (strcmp(updvals[0], "N") == 0) {
570 gettimeofday(&tmp_time, 0);
571 normalize_time(&tmp_time);
572 current_time = tmp_time.tv_sec;
573 current_time_usec = tmp_time.tv_usec;
574 } else {
575 double tmp;
577 tmp = strtod(updvals[0], 0);
578 current_time = floor(tmp);
579 current_time_usec =
580 (long) ((tmp - (double) current_time) * 1000000.0);
581 }
582 /* dont do any correction for old version RRDs */
583 if (version < 3)
584 current_time_usec = 0;
586 if (current_time < rrd.live_head->last_up ||
587 (current_time == rrd.live_head->last_up &&
588 (long) current_time_usec <=
589 (long) rrd.live_head->last_up_usec)) {
590 rrd_set_error("illegal attempt to update using time %ld when "
591 "last update time is %ld (minimum one second step)",
592 current_time, rrd.live_head->last_up);
593 free(step_start);
594 break;
595 }
598 /* seek to the beginning of the rra's */
599 if (rra_current != rra_begin) {
600 #ifndef HAVE_MMAP
601 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
602 rrd_set_error("seek error in rrd");
603 free(step_start);
604 break;
605 }
606 #endif
607 rra_current = rra_begin;
608 }
609 rra_start = rra_begin;
611 /* when was the current pdp started */
612 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
613 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
615 /* when did the last pdp_st occur */
616 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
617 occu_pdp_st = current_time - occu_pdp_age;
619 /* interval = current_time - rrd.live_head->last_up; */
620 interval = (double) (current_time - rrd.live_head->last_up)
621 + (double) ((long) current_time_usec -
622 (long) rrd.live_head->last_up_usec) / 1000000.0;
624 if (occu_pdp_st > proc_pdp_st) {
625 /* OK we passed the pdp_st moment */
626 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
627 * occurred before the latest
628 * pdp_st moment*/
629 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
630 post_int = occu_pdp_age; /* how much after it */
631 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
632 } else {
633 pre_int = interval;
634 post_int = 0;
635 }
637 #ifdef DEBUG
638 printf("proc_pdp_age %lu\t"
639 "proc_pdp_st %lu\t"
640 "occu_pfp_age %lu\t"
641 "occu_pdp_st %lu\t"
642 "int %lf\t"
643 "pre_int %lf\t"
644 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
645 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
646 #endif
648 /* process the data sources and update the pdp_prep
649 * area accordingly */
650 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
651 enum dst_en dst_idx;
653 dst_idx = dst_conv(rrd.ds_def[i].dst);
655 /* make sure we do not build diffs with old last_ds values */
656 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
657 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
658 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
659 }
661 /* NOTE: DST_CDEF should never enter this if block, because
662 * updvals[i+1][0] is initialized to 'U'; unless the caller
663 * accidently specified a value for the DST_CDEF. To handle
664 * this case, an extra check is required. */
666 if ((updvals[i + 1][0] != 'U') &&
667 (dst_idx != DST_CDEF) &&
668 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
669 double rate = DNAN;
671 /* the data source type defines how to process the data */
672 /* pdp_new contains rate * time ... eg the bytes
673 * transferred during the interval. Doing it this way saves
674 * a lot of math operations */
677 switch (dst_idx) {
678 case DST_COUNTER:
679 case DST_DERIVE:
680 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
681 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
682 if ((updvals[i + 1][ii] < '0'
683 || updvals[i + 1][ii] > '9') && (ii != 0
684 && updvals[i
685 +
686 1]
687 [ii] !=
688 '-')) {
689 rrd_set_error("not a simple integer: '%s'",
690 updvals[i + 1]);
691 break;
692 }
693 }
694 if (rrd_test_error()) {
695 break;
696 }
697 pdp_new[i] =
698 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
699 if (dst_idx == DST_COUNTER) {
700 /* simple overflow catcher suggested by Andres Kroonmaa */
701 /* this will fail terribly for non 32 or 64 bit counters ... */
702 /* are there any others in SNMP land ? */
703 if (pdp_new[i] < (double) 0.0)
704 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
705 if (pdp_new[i] < (double) 0.0)
706 pdp_new[i] += (double) 18446744069414584320.0;
707 /* 2^64-2^32 */ ;
708 }
709 rate = pdp_new[i] / interval;
710 } else {
711 pdp_new[i] = DNAN;
712 }
713 break;
714 case DST_ABSOLUTE:
715 errno = 0;
716 pdp_new[i] = strtod(updvals[i + 1], &endptr);
717 if (errno > 0) {
718 rrd_set_error("converting '%s' to float: %s",
719 updvals[i + 1], rrd_strerror(errno));
720 break;
721 };
722 if (endptr[0] != '\0') {
723 rrd_set_error
724 ("conversion of '%s' to float not complete: tail '%s'",
725 updvals[i + 1], endptr);
726 break;
727 }
728 rate = pdp_new[i] / interval;
729 break;
730 case DST_GAUGE:
731 errno = 0;
732 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
733 if (errno > 0) {
734 rrd_set_error("converting '%s' to float: %s",
735 updvals[i + 1], rrd_strerror(errno));
736 break;
737 };
738 if (endptr[0] != '\0') {
739 rrd_set_error
740 ("conversion of '%s' to float not complete: tail '%s'",
741 updvals[i + 1], endptr);
742 break;
743 }
744 rate = pdp_new[i] / interval;
745 break;
746 default:
747 rrd_set_error("rrd contains unknown DS type : '%s'",
748 rrd.ds_def[i].dst);
749 break;
750 }
751 /* break out of this for loop if the error string is set */
752 if (rrd_test_error()) {
753 break;
754 }
755 /* make sure pdp_temp is neither too large or too small
756 * if any of these occur it becomes unknown ...
757 * sorry folks ... */
758 if (!isnan(rate) &&
759 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
760 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
761 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
762 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
763 pdp_new[i] = DNAN;
764 }
765 } else {
766 /* no news is news all the same */
767 pdp_new[i] = DNAN;
768 }
771 /* make a copy of the command line argument for the next run */
772 #ifdef DEBUG
773 fprintf(stderr,
774 "prep ds[%lu]\t"
775 "last_arg '%s'\t"
776 "this_arg '%s'\t"
777 "pdp_new %10.2f\n",
778 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
779 #endif
780 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
781 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
782 }
783 /* break out of the argument parsing loop if the error_string is set */
784 if (rrd_test_error()) {
785 free(step_start);
786 break;
787 }
788 /* has a pdp_st moment occurred since the last run ? */
790 if (proc_pdp_st == occu_pdp_st) {
791 /* no we have not passed a pdp_st moment. therefore update is simple */
793 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
794 if (isnan(pdp_new[i])) {
795 /* this is not realy accurate if we use subsecond data arival time
796 should have thought of it when going subsecond resolution ...
797 sorry next format change we will have it! */
798 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
799 floor(interval);
800 } else {
801 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
802 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
803 } else {
804 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
805 }
806 }
807 #ifdef DEBUG
808 fprintf(stderr,
809 "NO PDP ds[%lu]\t"
810 "value %10.2f\t"
811 "unkn_sec %5lu\n",
812 i,
813 rrd.pdp_prep[i].scratch[PDP_val].u_val,
814 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
815 #endif
816 }
817 } else {
818 /* an pdp_st has occurred. */
820 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
821 * occurred up to the last run.
822 pdp_new[] contains rate*seconds from the latest run.
823 pdp_temp[] will contain the rate for cdp */
825 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
826 /* update pdp_prep to the current pdp_st. */
827 double pre_unknown = 0.0;
829 if (isnan(pdp_new[i]))
830 /* a final bit of unkonwn to be added bevore calculation
831 * we use a tempaorary variable for this so that we
832 * don't have to turn integer lines before using the value */
833 pre_unknown = pre_int;
834 else {
835 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
836 rrd.pdp_prep[i].scratch[PDP_val].u_val =
837 pdp_new[i] / interval * pre_int;
838 } else {
839 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
840 pdp_new[i] / interval * pre_int;
841 }
842 }
845 /* if too much of the pdp_prep is unknown we dump it */
846 if (
847 /* removed because this does not agree with the definition
848 a heart beat can be unknown */
849 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
850 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
851 /* if the interval is larger thatn mrhb we get NAN */
852 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
853 (occu_pdp_st - proc_pdp_st <=
854 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
855 pdp_temp[i] = DNAN;
856 } else {
857 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
858 / ((double) (occu_pdp_st - proc_pdp_st
859 -
860 rrd.pdp_prep[i].
861 scratch[PDP_unkn_sec_cnt].u_cnt)
862 - pre_unknown);
863 }
865 /* process CDEF data sources; remember each CDEF DS can
866 * only reference other DS with a lower index number */
867 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
868 rpnp_t *rpnp;
870 rpnp =
871 rpn_expand((rpn_cdefds_t *) &
872 (rrd.ds_def[i].par[DS_cdef]));
873 /* substitue data values for OP_VARIABLE nodes */
874 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
875 if (rpnp[ii].op == OP_VARIABLE) {
876 rpnp[ii].op = OP_NUMBER;
877 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
878 }
879 }
880 /* run the rpn calculator */
881 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
882 free(rpnp);
883 break; /* exits the data sources pdp_temp loop */
884 }
885 }
887 /* make pdp_prep ready for the next run */
888 if (isnan(pdp_new[i])) {
889 /* this is not realy accurate if we use subsecond data arival time
890 should have thought of it when going subsecond resolution ...
891 sorry next format change we will have it! */
892 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
893 floor(post_int);
894 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
895 } else {
896 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
897 rrd.pdp_prep[i].scratch[PDP_val].u_val =
898 pdp_new[i] / interval * post_int;
899 }
901 #ifdef DEBUG
902 fprintf(stderr,
903 "PDP UPD ds[%lu]\t"
904 "pdp_temp %10.2f\t"
905 "new_prep %10.2f\t"
906 "new_unkn_sec %5lu\n",
907 i, pdp_temp[i],
908 rrd.pdp_prep[i].scratch[PDP_val].u_val,
909 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
910 #endif
911 }
913 /* if there were errors during the last loop, bail out here */
914 if (rrd_test_error()) {
915 free(step_start);
916 break;
917 }
919 /* compute the number of elapsed pdp_st moments */
920 elapsed_pdp_st =
921 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
922 #ifdef DEBUG
923 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
924 #endif
925 if (rra_step_cnt == NULL) {
926 rra_step_cnt = (unsigned long *)
927 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
928 }
930 for (i = 0, rra_start = rra_begin;
931 i < rrd.stat_head->rra_cnt;
932 rra_start +=
933 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
934 sizeof(rrd_value_t), i++) {
935 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
936 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
937 (proc_pdp_st / rrd.stat_head->pdp_step) %
938 rrd.rra_def[i].pdp_cnt;
939 if (start_pdp_offset <= elapsed_pdp_st) {
940 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
941 rrd.rra_def[i].pdp_cnt + 1;
942 } else {
943 rra_step_cnt[i] = 0;
944 }
946 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
947 /* If this is a bulk update, we need to skip ahead in the seasonal
948 * arrays so that they will be correct for the next observed value;
949 * note that for the bulk update itself, no update will occur to
950 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
951 * be set to DNAN. */
952 if (rra_step_cnt[i] > 2) {
953 /* skip update by resetting rra_step_cnt[i],
954 * note that this is not data source specific; this is due
955 * to the bulk update, not a DNAN value for the specific data
956 * source. */
957 rra_step_cnt[i] = 0;
958 lookup_seasonal(&rrd, i, rra_start, rrd_file,
959 elapsed_pdp_st, &last_seasonal_coef);
960 lookup_seasonal(&rrd, i, rra_start, rrd_file,
961 elapsed_pdp_st + 1, &seasonal_coef);
962 }
964 /* periodically run a smoother for seasonal effects */
965 /* Need to use first cdp parameter buffer to track
966 * burnin (burnin requires a specific smoothing schedule).
967 * The CDP_init_seasonal parameter is really an RRA level,
968 * not a data source within RRA level parameter, but the rra_def
969 * is read only for rrd_update (not flushed to disk). */
970 iii = i * (rrd.stat_head->ds_cnt);
971 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
972 <= BURNIN_CYCLES) {
973 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
974 > rrd.rra_def[i].row_cnt - 1) {
975 /* mark off one of the burnin cycles */
976 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
977 u_cnt);
978 schedule_smooth = 1;
979 }
980 } else {
981 /* someone has no doubt invented a trick to deal with this
982 * wrap around, but at least this code is clear. */
983 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
984 u_cnt > rrd.rra_ptr[i].cur_row) {
985 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
986 * mapping between PDP and CDP */
987 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
988 >=
989 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
990 u_cnt) {
991 #ifdef DEBUG
992 fprintf(stderr,
993 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
994 rrd.rra_ptr[i].cur_row,
995 elapsed_pdp_st,
996 rrd.rra_def[i].
997 par[RRA_seasonal_smooth_idx].u_cnt);
998 #endif
999 schedule_smooth = 1;
1000 }
1001 } else {
1002 /* can't rely on negative numbers because we are working with
1003 * unsigned values */
1004 /* Don't need modulus here. If we've wrapped more than once, only
1005 * one smooth is executed at the end. */
1006 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
1007 rrd.rra_def[i].row_cnt
1008 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
1009 rrd.rra_def[i].row_cnt >=
1010 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
1011 u_cnt) {
1012 #ifdef DEBUG
1013 fprintf(stderr,
1014 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1015 rrd.rra_ptr[i].cur_row,
1016 elapsed_pdp_st,
1017 rrd.rra_def[i].
1018 par[RRA_seasonal_smooth_idx].u_cnt);
1019 #endif
1020 schedule_smooth = 1;
1021 }
1022 }
1023 }
1025 rra_current = rrd_tell(rrd_file);
1026 }
1027 /* if cf is DEVSEASONAL or SEASONAL */
1028 if (rrd_test_error())
1029 break;
1031 /* update CDP_PREP areas */
1032 /* loop over data soures within each RRA */
1033 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1035 /* iii indexes the CDP prep area for this data source within the RRA */
1036 iii = i * rrd.stat_head->ds_cnt + ii;
1038 if (rrd.rra_def[i].pdp_cnt > 1) {
1040 if (rra_step_cnt[i] > 0) {
1041 /* If we are in this block, as least 1 CDP value will be written to
1042 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1043 * to be written, then the "fill in" value is the CDP_secondary_val
1044 * entry. */
1045 if (isnan(pdp_temp[ii])) {
1046 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1047 u_cnt += start_pdp_offset;
1048 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1049 u_val = DNAN;
1050 } else {
1051 /* CDP_secondary value is the RRA "fill in" value for intermediary
1052 * CDP data entries. No matter the CF, the value is the same because
1053 * the average, max, min, and last of a list of identical values is
1054 * the same, namely, the value itself. */
1055 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1056 u_val = pdp_temp[ii];
1057 }
1059 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1060 u_cnt >
1061 rrd.rra_def[i].pdp_cnt *
1062 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1063 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1064 u_val = DNAN;
1065 /* initialize carry over */
1066 if (current_cf == CF_AVERAGE) {
1067 if (isnan(pdp_temp[ii])) {
1068 rrd.cdp_prep[iii].scratch[CDP_val].
1069 u_val = DNAN;
1070 } else {
1071 rrd.cdp_prep[iii].scratch[CDP_val].
1072 u_val =
1073 pdp_temp[ii] *
1074 ((elapsed_pdp_st -
1075 start_pdp_offset) %
1076 rrd.rra_def[i].pdp_cnt);
1077 }
1078 } else {
1079 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1080 pdp_temp[ii];
1081 }
1082 } else {
1083 rrd_value_t cum_val, cur_val;
1085 switch (current_cf) {
1086 case CF_AVERAGE:
1087 cum_val =
1088 IFDNAN(rrd.cdp_prep[iii].
1089 scratch[CDP_val].u_val, 0.0);
1090 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1091 rrd.cdp_prep[iii].
1092 scratch[CDP_primary_val].u_val =
1093 (cum_val +
1094 cur_val * start_pdp_offset) /
1095 (rrd.rra_def[i].pdp_cnt -
1096 rrd.cdp_prep[iii].
1097 scratch[CDP_unkn_pdp_cnt].u_cnt);
1098 /* initialize carry over value */
1099 if (isnan(pdp_temp[ii])) {
1100 rrd.cdp_prep[iii].scratch[CDP_val].
1101 u_val = DNAN;
1102 } else {
1103 rrd.cdp_prep[iii].scratch[CDP_val].
1104 u_val =
1105 pdp_temp[ii] *
1106 ((elapsed_pdp_st -
1107 start_pdp_offset) %
1108 rrd.rra_def[i].pdp_cnt);
1109 }
1110 break;
1111 case CF_MAXIMUM:
1112 cum_val =
1113 IFDNAN(rrd.cdp_prep[iii].
1114 scratch[CDP_val].u_val, -DINF);
1115 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1116 #ifdef DEBUG
1117 if (isnan
1118 (rrd.cdp_prep[iii].scratch[CDP_val].
1119 u_val) && isnan(pdp_temp[ii])) {
1120 fprintf(stderr,
1121 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1122 i, ii);
1123 exit(-1);
1124 }
1125 #endif
1126 if (cur_val > cum_val)
1127 rrd.cdp_prep[iii].
1128 scratch[CDP_primary_val].u_val =
1129 cur_val;
1130 else
1131 rrd.cdp_prep[iii].
1132 scratch[CDP_primary_val].u_val =
1133 cum_val;
1134 /* initialize carry over value */
1135 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1136 pdp_temp[ii];
1137 break;
1138 case CF_MINIMUM:
1139 cum_val =
1140 IFDNAN(rrd.cdp_prep[iii].
1141 scratch[CDP_val].u_val, DINF);
1142 cur_val = IFDNAN(pdp_temp[ii], DINF);
1143 #ifdef DEBUG
1144 if (isnan
1145 (rrd.cdp_prep[iii].scratch[CDP_val].
1146 u_val) && isnan(pdp_temp[ii])) {
1147 fprintf(stderr,
1148 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1149 i, ii);
1150 exit(-1);
1151 }
1152 #endif
1153 if (cur_val < cum_val)
1154 rrd.cdp_prep[iii].
1155 scratch[CDP_primary_val].u_val =
1156 cur_val;
1157 else
1158 rrd.cdp_prep[iii].
1159 scratch[CDP_primary_val].u_val =
1160 cum_val;
1161 /* initialize carry over value */
1162 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1163 pdp_temp[ii];
1164 break;
1165 case CF_LAST:
1166 default:
1167 rrd.cdp_prep[iii].
1168 scratch[CDP_primary_val].u_val =
1169 pdp_temp[ii];
1170 /* initialize carry over value */
1171 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1172 pdp_temp[ii];
1173 break;
1174 }
1175 } /* endif meets xff value requirement for a valid value */
1176 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1177 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1178 if (isnan(pdp_temp[ii]))
1179 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1180 u_cnt =
1181 (elapsed_pdp_st -
1182 start_pdp_offset) %
1183 rrd.rra_def[i].pdp_cnt;
1184 else
1185 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1186 u_cnt = 0;
1187 } else { /* rra_step_cnt[i] == 0 */
1189 #ifdef DEBUG
1190 if (isnan
1191 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1192 fprintf(stderr,
1193 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1194 i, ii);
1195 } else {
1196 fprintf(stderr,
1197 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1198 i, ii,
1199 rrd.cdp_prep[iii].scratch[CDP_val].
1200 u_val);
1201 }
1202 #endif
1203 if (isnan(pdp_temp[ii])) {
1204 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1205 u_cnt += elapsed_pdp_st;
1206 } else
1207 if (isnan
1208 (rrd.cdp_prep[iii].scratch[CDP_val].
1209 u_val)) {
1210 if (current_cf == CF_AVERAGE) {
1211 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1212 pdp_temp[ii] * elapsed_pdp_st;
1213 } else {
1214 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1215 pdp_temp[ii];
1216 }
1217 #ifdef DEBUG
1218 fprintf(stderr,
1219 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1220 i, ii,
1221 rrd.cdp_prep[iii].scratch[CDP_val].
1222 u_val);
1223 #endif
1224 } else {
1225 switch (current_cf) {
1226 case CF_AVERAGE:
1227 rrd.cdp_prep[iii].scratch[CDP_val].
1228 u_val +=
1229 pdp_temp[ii] * elapsed_pdp_st;
1230 break;
1231 case CF_MINIMUM:
1232 if (pdp_temp[ii] <
1233 rrd.cdp_prep[iii].scratch[CDP_val].
1234 u_val)
1235 rrd.cdp_prep[iii].scratch[CDP_val].
1236 u_val = pdp_temp[ii];
1237 break;
1238 case CF_MAXIMUM:
1239 if (pdp_temp[ii] >
1240 rrd.cdp_prep[iii].scratch[CDP_val].
1241 u_val)
1242 rrd.cdp_prep[iii].scratch[CDP_val].
1243 u_val = pdp_temp[ii];
1244 break;
1245 case CF_LAST:
1246 default:
1247 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1248 pdp_temp[ii];
1249 break;
1250 }
1251 }
1252 }
1253 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1254 if (elapsed_pdp_st > 2) {
1255 switch (current_cf) {
1256 case CF_AVERAGE:
1257 default:
1258 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1259 u_val = pdp_temp[ii];
1260 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1261 u_val = pdp_temp[ii];
1262 break;
1263 case CF_SEASONAL:
1264 case CF_DEVSEASONAL:
1265 /* need to update cached seasonal values, so they are consistent
1266 * with the bulk update */
1267 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1268 * CDP_last_deviation are the same. */
1269 rrd.cdp_prep[iii].
1270 scratch[CDP_hw_last_seasonal].u_val =
1271 last_seasonal_coef[ii];
1272 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1273 u_val = seasonal_coef[ii];
1274 break;
1275 case CF_HWPREDICT:
1276 /* need to update the null_count and last_null_count.
1277 * even do this for non-DNAN pdp_temp because the
1278 * algorithm is not learning from batch updates. */
1279 rrd.cdp_prep[iii].scratch[CDP_null_count].
1280 u_cnt += elapsed_pdp_st;
1281 rrd.cdp_prep[iii].
1282 scratch[CDP_last_null_count].u_cnt +=
1283 elapsed_pdp_st - 1;
1284 /* fall through */
1285 case CF_DEVPREDICT:
1286 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1287 u_val = DNAN;
1288 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1289 u_val = DNAN;
1290 break;
1291 case CF_FAILURES:
1292 /* do not count missed bulk values as failures */
1293 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1294 u_val = 0;
1295 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1296 u_val = 0;
1297 /* need to reset violations buffer.
1298 * could do this more carefully, but for now, just
1299 * assume a bulk update wipes away all violations. */
1300 erase_violations(&rrd, iii, i);
1301 break;
1302 }
1303 }
1304 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1306 if (rrd_test_error())
1307 break;
1309 } /* endif data sources loop */
1310 } /* end RRA Loop */
1312 /* this loop is only entered if elapsed_pdp_st < 3 */
1313 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1314 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1315 for (i = 0, rra_start = rra_begin;
1316 i < rrd.stat_head->rra_cnt;
1317 rra_start +=
1318 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1319 sizeof(rrd_value_t), i++) {
1320 if (rrd.rra_def[i].pdp_cnt > 1)
1321 continue;
1323 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1324 if (current_cf == CF_SEASONAL
1325 || current_cf == CF_DEVSEASONAL) {
1326 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1327 elapsed_pdp_st + (scratch_idx ==
1328 CDP_primary_val ? 1
1329 : 2),
1330 &seasonal_coef);
1331 rra_current = rrd_tell(rrd_file);
1332 }
1333 if (rrd_test_error())
1334 break;
1335 /* loop over data soures within each RRA */
1336 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1337 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1338 i * (rrd.stat_head->ds_cnt) + ii,
1339 i, ii, scratch_idx, seasonal_coef);
1340 }
1341 } /* end RRA Loop */
1342 if (rrd_test_error())
1343 break;
1344 } /* end elapsed_pdp_st loop */
1346 if (rrd_test_error())
1347 break;
1349 /* Ready to write to disk */
1350 /* Move sequentially through the file, writing one RRA at a time.
1351 * Note this architecture divorces the computation of CDP with
1352 * flushing updated RRA entries to disk. */
1353 for (i = 0, rra_start = rra_begin;
1354 i < rrd.stat_head->rra_cnt;
1355 rra_start +=
1356 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1357 sizeof(rrd_value_t), i++) {
1358 /* is th5Aere anything to write for this RRA? If not, continue. */
1359 if (rra_step_cnt[i] == 0)
1360 continue;
1362 /* write the first row */
1363 #ifdef DEBUG
1364 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1365 #endif
1366 rrd.rra_ptr[i].cur_row++;
1367 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1368 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1369 /* positition on the first row */
1370 rra_pos_tmp = rra_start +
1371 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1372 sizeof(rrd_value_t);
1373 if (rra_pos_tmp != rra_current) {
1374 #ifndef HAVE_MMAP
1375 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1376 rrd_set_error("seek error in rrd");
1377 break;
1378 }
1379 #endif
1380 rra_current = rra_pos_tmp;
1381 }
1382 #ifdef DEBUG
1383 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1384 #endif
1385 scratch_idx = CDP_primary_val;
1386 if (pcdp_summary != NULL) {
1387 rra_time = (current_time - current_time
1388 % (rrd.rra_def[i].pdp_cnt *
1389 rrd.stat_head->pdp_step))
1390 -
1391 ((rra_step_cnt[i] -
1392 1) * rrd.rra_def[i].pdp_cnt *
1393 rrd.stat_head->pdp_step);
1394 }
1395 pcdp_summary =
1396 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1397 scratch_idx, pcdp_summary, &rra_time);
1398 if (rrd_test_error())
1399 break;
1401 /* write other rows of the bulk update, if any */
1402 scratch_idx = CDP_secondary_val;
1403 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1404 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1405 #ifdef DEBUG
1406 fprintf(stderr,
1407 "Wraparound for RRA %s, %lu updates left\n",
1408 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1409 #endif
1410 /* wrap */
1411 rrd.rra_ptr[i].cur_row = 0;
1412 /* seek back to beginning of current rra */
1413 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1414 rrd_set_error("seek error in rrd");
1415 break;
1416 }
1417 #ifdef DEBUG
1418 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1419 rrd_file->pos);
1420 #endif
1421 rra_current = rra_start;
1422 }
1423 if (pcdp_summary != NULL) {
1424 rra_time = (current_time - current_time
1425 % (rrd.rra_def[i].pdp_cnt *
1426 rrd.stat_head->pdp_step))
1427 -
1428 ((rra_step_cnt[i] -
1429 2) * rrd.rra_def[i].pdp_cnt *
1430 rrd.stat_head->pdp_step);
1431 }
1432 pcdp_summary =
1433 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1434 scratch_idx, pcdp_summary, &rra_time);
1435 }
1437 if (rrd_test_error())
1438 break;
1439 } /* RRA LOOP */
1441 /* break out of the argument parsing loop if error_string is set */
1442 if (rrd_test_error()) {
1443 free(step_start);
1444 break;
1445 }
1447 } /* endif a pdp_st has occurred */
1448 rrd.live_head->last_up = current_time;
1449 rrd.live_head->last_up_usec = current_time_usec;
1450 free(step_start);
1451 } /* function argument loop */
1453 if (seasonal_coef != NULL)
1454 free(seasonal_coef);
1455 if (last_seasonal_coef != NULL)
1456 free(last_seasonal_coef);
1457 if (rra_step_cnt != NULL)
1458 free(rra_step_cnt);
1459 rpnstack_free(&rpnstack);
1461 #if 0 //def HAVE_MMAP
1462 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1463 rrd_set_error("error writing(unmapping) file: %s", filename);
1464 }
1465 #else
1466 rrd_flush(rrd_file); //XXX: really needed?
1467 #endif
1468 /* if we got here and if there is an error and if the file has not been
1469 * written to, then close things up and return. */
1470 if (rrd_test_error()) {
1471 free(updvals);
1472 free(tmpl_idx);
1473 rrd_free(&rrd);
1474 free(pdp_temp);
1475 free(pdp_new);
1476 close(rrd_file->fd);
1477 return (-1);
1478 }
1480 /* aargh ... that was tough ... so many loops ... anyway, its done.
1481 * we just need to write back the live header portion now*/
1483 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1484 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1485 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1486 SEEK_SET) != 0) {
1487 rrd_set_error("seek rrd for live header writeback");
1488 free(updvals);
1489 free(tmpl_idx);
1490 rrd_free(&rrd);
1491 free(pdp_temp);
1492 free(pdp_new);
1493 close(rrd_file->fd);
1494 return (-1);
1495 }
1497 if (version >= 3) {
1498 if (rrd_write(rrd_file, rrd.live_head,
1499 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1500 rrd_set_error("rrd_write live_head to rrd");
1501 free(updvals);
1502 rrd_free(&rrd);
1503 free(tmpl_idx);
1504 free(pdp_temp);
1505 free(pdp_new);
1506 close(rrd_file->fd);
1507 return (-1);
1508 }
1509 } else {
1510 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1511 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1512 rrd_set_error("rrd_write live_head to rrd");
1513 free(updvals);
1514 rrd_free(&rrd);
1515 free(tmpl_idx);
1516 free(pdp_temp);
1517 free(pdp_new);
1518 close(rrd_file->fd);
1519 return (-1);
1520 }
1521 }
1524 if (rrd_write(rrd_file, rrd.pdp_prep,
1525 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1526 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1527 rrd_set_error("rrd_write pdp_prep to rrd");
1528 free(updvals);
1529 rrd_free(&rrd);
1530 free(tmpl_idx);
1531 free(pdp_temp);
1532 free(pdp_new);
1533 close(rrd_file->fd);
1534 return (-1);
1535 }
1537 if (rrd_write(rrd_file, rrd.cdp_prep,
1538 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1539 rrd.stat_head->ds_cnt)
1540 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1541 rrd.stat_head->ds_cnt)) {
1543 rrd_set_error("rrd_write cdp_prep to rrd");
1544 free(updvals);
1545 free(tmpl_idx);
1546 rrd_free(&rrd);
1547 free(pdp_temp);
1548 free(pdp_new);
1549 close(rrd_file->fd);
1550 return (-1);
1551 }
1553 if (rrd_write(rrd_file, rrd.rra_ptr,
1554 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1555 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1556 rrd_set_error("rrd_write rra_ptr to rrd");
1557 free(updvals);
1558 free(tmpl_idx);
1559 rrd_free(&rrd);
1560 free(pdp_temp);
1561 free(pdp_new);
1562 close(rrd_file->fd);
1563 return (-1);
1564 }
1565 #ifdef HAVE_POSIX_FADVISExxx
1567 /* with update we have write ops, so they will probably not be done by now, this means
1568 the buffers will not get freed. But calling this for the whole file - header
1569 will let the data off the hook as soon as it is written when if it is from a previous
1570 update cycle. Calling fdsync to force things is much too hard here. */
1572 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1573 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1574 rrd_strerror(errno));
1575 close(rrd_file->fd);
1576 return (-1);
1577 }
1578 #endif
1579 /*XXX: ? */ rrd_flush(rrd_file);
1581 /* calling the smoothing code here guarantees at most
1582 * one smoothing operation per rrd_update call. Unfortunately,
1583 * it is possible with bulk updates, or a long-delayed update
1584 * for smoothing to occur off-schedule. This really isn't
1585 * critical except during the burning cycles. */
1586 if (schedule_smooth) {
1587 // in_file = fopen(filename,"rb+");
1590 rra_start = rra_begin;
1591 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1592 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1593 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1594 #ifdef DEBUG
1595 fprintf(stderr, "Running smoother for rra %ld\n", i);
1596 #endif
1597 apply_smoother(&rrd, i, rra_start, rrd_file);
1598 if (rrd_test_error())
1599 break;
1600 }
1601 rra_start += rrd.rra_def[i].row_cnt
1602 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1603 }
1604 #ifdef HAVE_POSIX_FADVISExxx
1605 /* same procedure as above ... */
1606 if (0 !=
1607 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1608 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1609 rrd_strerror(errno));
1610 close(rrd_file->fd);
1611 return (-1);
1612 }
1613 #endif
1614 }
1616 rrd_close(rrd_file);
1617 rrd_free(&rrd);
1618 free(updvals);
1619 free(tmpl_idx);
1620 free(pdp_new);
1621 free(pdp_temp);
1622 return (0);
1623 }
1625 /*
1626 * get exclusive lock to whole file.
1627 * lock gets removed when we close the file
1628 *
1629 * returns 0 on success
1630 */
1631 int LockRRD(
1632 int in_file)
1633 {
1634 int rcstat;
1636 {
1637 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1638 struct _stat st;
1640 if (_fstat(in_file, &st) == 0) {
1641 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1642 } else {
1643 rcstat = -1;
1644 }
1645 #else
1646 struct flock lock;
1648 lock.l_type = F_WRLCK; /* exclusive write lock */
1649 lock.l_len = 0; /* whole file */
1650 lock.l_start = 0; /* start of file */
1651 lock.l_whence = SEEK_SET; /* end of file */
1653 rcstat = fcntl(in_file, F_SETLK, &lock);
1654 #endif
1655 }
1657 return (rcstat);
1658 }