1 /*****************************************************************************
2 * RRDtool 1.2.28 Copyright by Tobi Oetiker, 1997-2008
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id: rrd_update.c 1450 2008-07-23 13:45:41Z oetiker $
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 * replacement.
29 */
30 #include <sys/timeb.h>
32 #if (defined(__MINGW32__) && \
33 ((__MINGW32_MAJOR_VERSION == 3 && __MINGW32_MINOR_VERSION >= 12) || __MINGW32_MAJOR_VERSION > 3))
34 #include <sys/time.h>
35 #else
37 struct timeval {
38 time_t tv_sec; /* seconds */
39 long tv_usec; /* microseconds */
40 };
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif /* mingw32 3.4.5 */
60 #endif
62 /*
63 * normilize time as returned by gettimeofday. usec part must
64 * be always >= 0
65 */
66 static void normalize_time(struct timeval *t)
67 {
68 if(t->tv_usec < 0) {
69 t->tv_sec--;
70 t->tv_usec += 1000000L;
71 }
72 }
74 /* Local prototypes */
75 int LockRRD(FILE *rrd_file);
76 #ifdef HAVE_MMAP
77 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
78 unsigned long *rra_current,
79 unsigned short CDP_scratch_idx,
80 #ifndef DEBUG
81 FILE UNUSED(*rrd_file),
82 #else
83 FILE *rrd_file,
84 #endif
85 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
86 #else
87 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
88 unsigned long *rra_current,
89 unsigned short CDP_scratch_idx, FILE *rrd_file,
90 info_t *pcdp_summary, time_t *rra_time);
91 #endif
92 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
93 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
94 info_t*);
96 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
99 info_t *rrd_update_v(int argc, char **argv)
100 {
101 char *tmplt = NULL;
102 info_t *result = NULL;
103 infoval rc;
104 rc.u_int = -1;
105 optind = 0; opterr = 0; /* initialize getopt */
107 while (1) {
108 static struct option long_options[] =
109 {
110 {"template", required_argument, 0, 't'},
111 {0,0,0,0}
112 };
113 int option_index = 0;
114 int opt;
115 opt = getopt_long(argc, argv, "t:",
116 long_options, &option_index);
118 if (opt == EOF)
119 break;
121 switch(opt) {
122 case 't':
123 tmplt = optarg;
124 break;
126 case '?':
127 rrd_set_error("unknown option '%s'",argv[optind-1]);
128 goto end_tag;
129 }
130 }
132 /* need at least 2 arguments: filename, data. */
133 if (argc-optind < 2) {
134 rrd_set_error("Not enough arguments");
135 goto end_tag;
136 }
137 rc.u_int = 0;
138 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
139 rc.u_int = _rrd_update(argv[optind], tmplt,
140 argc - optind - 1, (const char **)(argv + optind + 1), result);
141 result->value.u_int = rc.u_int;
142 end_tag:
143 return result;
144 }
146 int
147 rrd_update(int argc, char **argv)
148 {
149 char *tmplt = NULL;
150 int rc;
151 optind = 0; opterr = 0; /* initialize getopt */
153 while (1) {
154 static struct option long_options[] =
155 {
156 {"template", required_argument, 0, 't'},
157 {0,0,0,0}
158 };
159 int option_index = 0;
160 int opt;
161 opt = getopt_long(argc, argv, "t:",
162 long_options, &option_index);
164 if (opt == EOF)
165 break;
167 switch(opt) {
168 case 't':
169 tmplt = optarg;
170 break;
172 case '?':
173 rrd_set_error("unknown option '%s'",argv[optind-1]);
174 return(-1);
175 }
176 }
178 /* need at least 2 arguments: filename, data. */
179 if (argc-optind < 2) {
180 rrd_set_error("Not enough arguments");
182 return -1;
183 }
185 rc = rrd_update_r(argv[optind], tmplt,
186 argc - optind - 1, (const char **)(argv + optind + 1));
187 return rc;
188 }
190 int
191 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
192 {
193 return _rrd_update(filename, tmplt, argc, argv, NULL);
194 }
196 int
197 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
198 info_t *pcdp_summary)
199 {
201 int arg_i = 2;
202 short j;
203 unsigned long i,ii,iii=1;
205 unsigned long rra_begin; /* byte pointer to the rra
206 * area in the rrd file. this
207 * pointer never changes value */
208 unsigned long rra_start; /* byte pointer to the rra
209 * area in the rrd file. this
210 * pointer changes as each rrd is
211 * processed. */
212 unsigned long rra_current; /* byte pointer to the current write
213 * spot in the rrd file. */
214 unsigned long rra_pos_tmp; /* temporary byte pointer. */
215 double interval,
216 pre_int,post_int; /* interval between this and
217 * the last run */
218 unsigned long proc_pdp_st; /* which pdp_st was the last
219 * to be processed */
220 unsigned long occu_pdp_st; /* when was the pdp_st
221 * before the last update
222 * time */
223 unsigned long proc_pdp_age; /* how old was the data in
224 * the pdp prep area when it
225 * was last updated */
226 unsigned long occu_pdp_age; /* how long ago was the last
227 * pdp_step time */
228 rrd_value_t *pdp_new; /* prepare the incoming data
229 * to be added the the
230 * existing entry */
231 rrd_value_t *pdp_temp; /* prepare the pdp values
232 * to be added the the
233 * cdp values */
235 long *tmpl_idx; /* index representing the settings
236 transported by the tmplt index */
237 unsigned long tmpl_cnt = 2; /* time and data */
239 FILE *rrd_file;
240 rrd_t rrd;
241 time_t current_time = 0;
242 time_t rra_time = 0; /* time of update for a RRA */
243 unsigned long current_time_usec=0;/* microseconds part of current time */
244 struct timeval tmp_time; /* used for time conversion */
246 char **updvals;
247 int schedule_smooth = 0;
248 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
249 /* a vector of future Holt-Winters seasonal coefs */
250 unsigned long elapsed_pdp_st;
251 /* number of elapsed PDP steps since last update */
252 unsigned long *rra_step_cnt = NULL;
253 /* number of rows to be updated in an RRA for a data
254 * value. */
255 unsigned long start_pdp_offset;
256 /* number of PDP steps since the last update that
257 * are assigned to the first CDP to be generated
258 * since the last update. */
259 unsigned short scratch_idx;
260 /* index into the CDP scratch array */
261 enum cf_en current_cf;
262 /* numeric id of the current consolidation function */
263 rpnstack_t rpnstack; /* used for COMPUTE DS */
264 int version; /* rrd version */
265 char *endptr; /* used in the conversion */
267 #ifdef HAVE_MMAP
268 void *rrd_mmaped_file;
269 unsigned long rrd_filesize;
270 #endif
273 rpnstack_init(&rpnstack);
275 /* need at least 1 arguments: data. */
276 if (argc < 1) {
277 rrd_set_error("Not enough arguments");
278 return -1;
279 }
283 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
284 return -1;
285 }
287 /* initialize time */
288 version = atoi(rrd.stat_head->version);
289 gettimeofday(&tmp_time, 0);
290 normalize_time(&tmp_time);
291 current_time = tmp_time.tv_sec;
292 if(version >= 3) {
293 current_time_usec = tmp_time.tv_usec;
294 }
295 else {
296 current_time_usec = 0;
297 }
299 rra_current = rra_start = rra_begin = ftell(rrd_file);
300 /* This is defined in the ANSI C standard, section 7.9.5.3:
302 When a file is opened with udpate mode ('+' as the second
303 or third character in the ... list of mode argument
304 variables), both input and ouptut may be performed on the
305 associated stream. However, ... input may not be directly
306 followed by output without an intervening call to a file
307 positioning function, unless the input oepration encounters
308 end-of-file. */
309 #ifdef HAVE_MMAP
310 fseek(rrd_file, 0, SEEK_END);
311 rrd_filesize = ftell(rrd_file);
312 fseek(rrd_file, rra_current, SEEK_SET);
313 #else
314 fseek(rrd_file, 0, SEEK_CUR);
315 #endif
318 /* get exclusive lock to whole file.
319 * lock gets removed when we close the file.
320 */
321 if (LockRRD(rrd_file) != 0) {
322 rrd_set_error("could not lock RRD");
323 rrd_free(&rrd);
324 fclose(rrd_file);
325 return(-1);
326 }
328 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
329 rrd_set_error("allocating updvals pointer array");
330 rrd_free(&rrd);
331 fclose(rrd_file);
332 return(-1);
333 }
335 if ((pdp_temp = malloc(sizeof(rrd_value_t)
336 *rrd.stat_head->ds_cnt))==NULL){
337 rrd_set_error("allocating pdp_temp ...");
338 free(updvals);
339 rrd_free(&rrd);
340 fclose(rrd_file);
341 return(-1);
342 }
344 if ((tmpl_idx = malloc(sizeof(unsigned long)
345 *(rrd.stat_head->ds_cnt+1)))==NULL){
346 rrd_set_error("allocating tmpl_idx ...");
347 free(pdp_temp);
348 free(updvals);
349 rrd_free(&rrd);
350 fclose(rrd_file);
351 return(-1);
352 }
353 /* initialize tmplt redirector */
354 /* default config example (assume DS 1 is a CDEF DS)
355 tmpl_idx[0] -> 0; (time)
356 tmpl_idx[1] -> 1; (DS 0)
357 tmpl_idx[2] -> 3; (DS 2)
358 tmpl_idx[3] -> 4; (DS 3) */
359 tmpl_idx[0] = 0; /* time */
360 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
361 {
362 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
363 tmpl_idx[ii++]=i;
364 }
365 tmpl_cnt= ii;
367 if (tmplt) {
368 /* we should work on a writeable copy here */
369 char *dsname;
370 unsigned int tmpl_len;
371 char *tmplt_copy = strdup(tmplt);
372 dsname = tmplt_copy;
373 tmpl_cnt = 1; /* the first entry is the time */
374 tmpl_len = strlen(tmplt_copy);
375 for(i=0;i<=tmpl_len ;i++) {
376 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
377 tmplt_copy[i] = '\0';
378 if (tmpl_cnt>rrd.stat_head->ds_cnt){
379 rrd_set_error("tmplt contains more DS definitions than RRD");
380 free(updvals); free(pdp_temp);
381 free(tmpl_idx); rrd_free(&rrd);
382 fclose(rrd_file); return(-1);
383 }
384 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
385 rrd_set_error("unknown DS name '%s'",dsname);
386 free(updvals); free(pdp_temp);
387 free(tmplt_copy);
388 free(tmpl_idx); rrd_free(&rrd);
389 fclose(rrd_file); return(-1);
390 } else {
391 /* the first element is always the time */
392 tmpl_idx[tmpl_cnt-1]++;
393 /* go to the next entry on the tmplt_copy */
394 dsname = &tmplt_copy[i+1];
395 /* fix the damage we did before */
396 if (i<tmpl_len) {
397 tmplt_copy[i]=':';
398 }
400 }
401 }
402 }
403 free(tmplt_copy);
404 }
405 if ((pdp_new = malloc(sizeof(rrd_value_t)
406 *rrd.stat_head->ds_cnt))==NULL){
407 rrd_set_error("allocating pdp_new ...");
408 free(updvals);
409 free(pdp_temp);
410 free(tmpl_idx);
411 rrd_free(&rrd);
412 fclose(rrd_file);
413 return(-1);
414 }
416 #ifdef HAVE_MMAP
417 rrd_mmaped_file = mmap(0,
418 rrd_filesize,
419 PROT_READ | PROT_WRITE,
420 MAP_SHARED,
421 fileno(rrd_file),
422 0);
423 if (rrd_mmaped_file == MAP_FAILED) {
424 rrd_set_error("error mmapping file %s", filename);
425 free(updvals);
426 free(pdp_temp);
427 free(tmpl_idx);
428 rrd_free(&rrd);
429 fclose(rrd_file);
430 return(-1);
431 }
432 #ifdef USE_MADVISE
433 /* when we use mmaping we tell the kernel the mmap equivalent
434 of POSIX_FADV_RANDOM */
435 madvise(rrd_mmaped_file,rrd_filesize,MADV_RANDOM);
436 #endif
437 #endif
438 /* loop through the arguments. */
439 for(arg_i=0; arg_i<argc;arg_i++) {
440 char *stepper = strdup(argv[arg_i]);
441 char *step_start = stepper;
442 char *p;
443 char *parsetime_error = NULL;
444 enum {atstyle, normal} timesyntax;
445 struct rrd_time_value ds_tv;
446 if (stepper == NULL){
447 rrd_set_error("failed duplication argv entry");
448 free(step_start);
449 free(updvals);
450 free(pdp_temp);
451 free(tmpl_idx);
452 rrd_free(&rrd);
453 #ifdef HAVE_MMAP
454 munmap(rrd_mmaped_file, rrd_filesize);
455 #endif
456 fclose(rrd_file);
457 return(-1);
458 }
459 /* initialize all ds input to unknown except the first one
460 which has always got to be set */
461 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
462 updvals[0]=stepper;
463 /* separate all ds elements; first must be examined separately
464 due to alternate time syntax */
465 if ((p=strchr(stepper,'@'))!=NULL) {
466 timesyntax = atstyle;
467 *p = '\0';
468 stepper = p+1;
469 } else if ((p=strchr(stepper,':'))!=NULL) {
470 timesyntax = normal;
471 *p = '\0';
472 stepper = p+1;
473 } else {
474 rrd_set_error("expected timestamp not found in data source from %s",
475 argv[arg_i]);
476 free(step_start);
477 break;
478 }
479 ii=1;
480 updvals[tmpl_idx[ii]] = stepper;
481 while (*stepper) {
482 if (*stepper == ':') {
483 *stepper = '\0';
484 ii++;
485 if (ii<tmpl_cnt){
486 updvals[tmpl_idx[ii]] = stepper+1;
487 }
488 }
489 stepper++;
490 }
492 if (ii != tmpl_cnt-1) {
493 rrd_set_error("expected %lu data source readings (got %lu) from %s",
494 tmpl_cnt-1, ii, argv[arg_i]);
495 free(step_start);
496 break;
497 }
499 /* get the time from the reading ... handle N */
500 if (timesyntax == atstyle) {
501 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
502 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
503 free(step_start);
504 break;
505 }
506 if (ds_tv.type == RELATIVE_TO_END_TIME ||
507 ds_tv.type == RELATIVE_TO_START_TIME) {
508 rrd_set_error("specifying time relative to the 'start' "
509 "or 'end' makes no sense here: %s",
510 updvals[0]);
511 free(step_start);
512 break;
513 }
515 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
516 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
518 } else if (strcmp(updvals[0],"N")==0){
519 gettimeofday(&tmp_time, 0);
520 normalize_time(&tmp_time);
521 current_time = tmp_time.tv_sec;
522 current_time_usec = tmp_time.tv_usec;
523 } else {
524 double tmp;
525 tmp = strtod(updvals[0], 0);
526 current_time = floor(tmp);
527 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
528 }
529 /* dont do any correction for old version RRDs */
530 if(version < 3)
531 current_time_usec = 0;
533 if(current_time < rrd.live_head->last_up ||
534 (current_time == rrd.live_head->last_up &&
535 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
536 rrd_set_error("%s: illegal attempt to update using time %ld when "
537 "last update time is %ld (minimum one second step)",
538 filename, current_time, rrd.live_head->last_up);
539 free(step_start);
540 break;
541 }
544 /* seek to the beginning of the rra's */
545 if (rra_current != rra_begin) {
546 #ifndef HAVE_MMAP
547 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548 rrd_set_error("seek error in rrd");
549 free(step_start);
550 break;
551 }
552 #endif
553 rra_current = rra_begin;
554 }
555 rra_start = rra_begin;
557 /* when was the current pdp started */
558 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
561 /* when did the last pdp_st occur */
562 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563 occu_pdp_st = current_time - occu_pdp_age;
565 /* interval = current_time - rrd.live_head->last_up; */
566 interval = (double)(current_time - rrd.live_head->last_up)
567 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
569 if (occu_pdp_st > proc_pdp_st){
570 /* OK we passed the pdp_st moment*/
571 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
572 * occurred before the latest
573 * pdp_st moment*/
574 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
575 post_int = occu_pdp_age; /* how much after it */
576 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
577 } else {
578 pre_int = interval;
579 post_int = 0;
580 }
582 #ifdef DEBUG
583 printf(
584 "proc_pdp_age %lu\t"
585 "proc_pdp_st %lu\t"
586 "occu_pfp_age %lu\t"
587 "occu_pdp_st %lu\t"
588 "int %lf\t"
589 "pre_int %lf\t"
590 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
591 occu_pdp_age, occu_pdp_st,
592 interval, pre_int, post_int);
593 #endif
595 /* process the data sources and update the pdp_prep
596 * area accordingly */
597 for(i=0;i<rrd.stat_head->ds_cnt;i++){
598 enum dst_en dst_idx;
599 dst_idx= dst_conv(rrd.ds_def[i].dst);
601 /* make sure we do not build diffs with old last_ds values */
602 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
603 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
604 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
605 }
607 /* NOTE: DST_CDEF should never enter this if block, because
608 * updvals[i+1][0] is initialized to 'U'; unless the caller
609 * accidently specified a value for the DST_CDEF. To handle
610 * this case, an extra check is required. */
612 if((updvals[i+1][0] != 'U') &&
613 (dst_idx != DST_CDEF) &&
614 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
615 double rate = DNAN;
616 /* the data source type defines how to process the data */
617 /* pdp_new contains rate * time ... eg the bytes
618 * transferred during the interval. Doing it this way saves
619 * a lot of math operations */
622 switch(dst_idx){
623 case DST_COUNTER:
624 case DST_DERIVE:
625 for(ii=0;updvals[i+1][ii] != '\0';ii++){
626 if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
627 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
628 break;
629 }
630 }
631 if (rrd_test_error()){
632 break;
633 }
634 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
635 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
636 if(dst_idx == DST_COUNTER) {
637 /* simple overflow catcher suggested by Andres Kroonmaa */
638 /* this will fail terribly for non 32 or 64 bit counters ... */
639 /* are there any others in SNMP land ? */
640 if (pdp_new[i] < (double)0.0 )
641 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
642 if (pdp_new[i] < (double)0.0 )
643 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
644 }
645 rate = pdp_new[i] / interval;
646 }
647 else {
648 pdp_new[i]= DNAN;
649 }
650 break;
651 case DST_ABSOLUTE:
652 errno = 0;
653 pdp_new[i] = strtod(updvals[i+1],&endptr);
654 if (errno > 0){
655 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
656 break;
657 };
658 if (endptr[0] != '\0'){
659 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
660 break;
661 }
662 rate = pdp_new[i] / interval;
663 break;
664 case DST_GAUGE:
665 errno = 0;
666 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
667 if (errno > 0){
668 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
669 break;
670 };
671 if (endptr[0] != '\0'){
672 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
673 break;
674 }
675 rate = pdp_new[i] / interval;
676 break;
677 default:
678 rrd_set_error("rrd contains unknown DS type : '%s'",
679 rrd.ds_def[i].dst);
680 break;
681 }
682 /* break out of this for loop if the error string is set */
683 if (rrd_test_error()){
684 break;
685 }
686 /* make sure pdp_temp is neither too large or too small
687 * if any of these occur it becomes unknown ...
688 * sorry folks ... */
689 if ( ! isnan(rate) &&
690 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
691 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
692 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
693 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
694 pdp_new[i] = DNAN;
695 }
696 } else {
697 /* no news is news all the same */
698 pdp_new[i] = DNAN;
699 }
702 /* make a copy of the command line argument for the next run */
703 #ifdef DEBUG
704 fprintf(stderr,
705 "prep ds[%lu]\t"
706 "last_arg '%s'\t"
707 "this_arg '%s'\t"
708 "pdp_new %10.2f\n",
709 i,
710 rrd.pdp_prep[i].last_ds,
711 updvals[i+1], pdp_new[i]);
712 #endif
713 strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
714 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
715 }
716 /* break out of the argument parsing loop if the error_string is set */
717 if (rrd_test_error()){
718 free(step_start);
719 break;
720 }
721 /* has a pdp_st moment occurred since the last run ? */
723 if (proc_pdp_st == occu_pdp_st){
724 /* no we have not passed a pdp_st moment. therefore update is simple */
726 for(i=0;i<rrd.stat_head->ds_cnt;i++){
727 if(isnan(pdp_new[i])) {
728 /* this is not realy accurate if we use subsecond data arival time
729 should have thought of it when going subsecond resolution ...
730 sorry next format change we will have it! */
731 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
732 } else {
733 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
734 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
735 } else {
736 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
737 }
738 }
739 #ifdef DEBUG
740 fprintf(stderr,
741 "NO PDP ds[%lu]\t"
742 "value %10.2f\t"
743 "unkn_sec %5lu\n",
744 i,
745 rrd.pdp_prep[i].scratch[PDP_val].u_val,
746 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 #endif
748 }
749 } else {
750 /* an pdp_st has occurred. */
752 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
753 * occurred up to the last run.
754 pdp_new[] contains rate*seconds from the latest run.
755 pdp_temp[] will contain the rate for cdp */
757 for(i=0;i<rrd.stat_head->ds_cnt;i++){
758 /* update pdp_prep to the current pdp_st. */
759 double pre_unknown = 0.0;
760 if(isnan(pdp_new[i]))
761 /* a final bit of unkonwn to be added bevore calculation
762 * we use a tempaorary variable for this so that we
763 * don't have to turn integer lines before using the value */
764 pre_unknown = pre_int;
765 else {
766 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
767 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
768 } else {
769 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
770 }
771 }
774 /* if too much of the pdp_prep is unknown we dump it */
775 if (
776 /* removed because this does not agree with the definition
777 a heart beat can be unknown */
778 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
779 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
780 /* if the interval is larger thatn mrhb we get NAN */
781 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
782 (rrd.stat_head -> pdp_step / 2.0 <
783 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
784 pdp_temp[i] = DNAN;
785 } else {
786 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
787 / ((double)(occu_pdp_st - proc_pdp_st
788 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
789 -pre_unknown);
790 }
792 /* process CDEF data sources; remember each CDEF DS can
793 * only reference other DS with a lower index number */
794 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
795 rpnp_t *rpnp;
796 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
797 /* substitue data values for OP_VARIABLE nodes */
798 for (ii = 0; rpnp[ii].op != OP_END; ii++)
799 {
800 if (rpnp[ii].op == OP_VARIABLE) {
801 rpnp[ii].op = OP_NUMBER;
802 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
803 }
804 }
805 /* run the rpn calculator */
806 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
807 free(rpnp);
808 break; /* exits the data sources pdp_temp loop */
809 }
810 }
812 /* make pdp_prep ready for the next run */
813 if(isnan(pdp_new[i])){
814 /* this is not realy accurate if we use subsecond data arival time
815 should have thought of it when going subsecond resolution ...
816 sorry next format change we will have it! */
817 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
818 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
819 } else {
820 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821 rrd.pdp_prep[i].scratch[PDP_val].u_val =
822 pdp_new[i]/interval*post_int;
823 }
825 #ifdef DEBUG
826 fprintf(stderr,
827 "PDP UPD ds[%lu]\t"
828 "pdp_temp %10.2f\t"
829 "new_prep %10.2f\t"
830 "new_unkn_sec %5lu\n",
831 i, pdp_temp[i],
832 rrd.pdp_prep[i].scratch[PDP_val].u_val,
833 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
834 #endif
835 }
837 /* if there were errors during the last loop, bail out here */
838 if (rrd_test_error()){
839 free(step_start);
840 break;
841 }
843 /* compute the number of elapsed pdp_st moments */
844 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
845 #ifdef DEBUG
846 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
847 #endif
848 if (rra_step_cnt == NULL)
849 {
850 rra_step_cnt = (unsigned long *)
851 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
852 }
854 for(i = 0, rra_start = rra_begin;
855 i < rrd.stat_head->rra_cnt;
856 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
857 i++)
858 {
859 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
862 if (start_pdp_offset <= elapsed_pdp_st) {
863 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
864 rrd.rra_def[i].pdp_cnt + 1;
865 } else {
866 rra_step_cnt[i] = 0;
867 }
869 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
870 {
871 /* If this is a bulk update, we need to skip ahead in the seasonal
872 * arrays so that they will be correct for the next observed value;
873 * note that for the bulk update itself, no update will occur to
874 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
875 * be set to DNAN. */
876 if (rra_step_cnt[i] > 2)
877 {
878 /* skip update by resetting rra_step_cnt[i],
879 * note that this is not data source specific; this is due
880 * to the bulk update, not a DNAN value for the specific data
881 * source. */
882 rra_step_cnt[i] = 0;
883 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
884 &last_seasonal_coef);
885 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
886 &seasonal_coef);
887 }
889 /* periodically run a smoother for seasonal effects */
890 /* Need to use first cdp parameter buffer to track
891 * burnin (burnin requires a specific smoothing schedule).
892 * The CDP_init_seasonal parameter is really an RRA level,
893 * not a data source within RRA level parameter, but the rra_def
894 * is read only for rrd_update (not flushed to disk). */
895 iii = i*(rrd.stat_head -> ds_cnt);
896 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
897 <= BURNIN_CYCLES)
898 {
899 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
900 > rrd.rra_def[i].row_cnt - 1) {
901 /* mark off one of the burnin cycles */
902 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
903 schedule_smooth = 1;
904 }
905 } else {
906 /* someone has no doubt invented a trick to deal with this
907 * wrap around, but at least this code is clear. */
908 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
909 rrd.rra_ptr[i].cur_row)
910 {
911 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
912 * mapping between PDP and CDP */
913 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
914 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
915 {
916 #ifdef DEBUG
917 fprintf(stderr,
918 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
920 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
921 #endif
922 schedule_smooth = 1;
923 }
924 } else {
925 /* can't rely on negative numbers because we are working with
926 * unsigned values */
927 /* Don't need modulus here. If we've wrapped more than once, only
928 * one smooth is executed at the end. */
929 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
930 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
931 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
932 {
933 #ifdef DEBUG
934 fprintf(stderr,
935 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
936 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
937 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
938 #endif
939 schedule_smooth = 1;
940 }
941 }
942 }
944 rra_current = ftell(rrd_file);
945 } /* if cf is DEVSEASONAL or SEASONAL */
947 if (rrd_test_error()) break;
949 /* update CDP_PREP areas */
950 /* loop over data soures within each RRA */
951 for(ii = 0;
952 ii < rrd.stat_head->ds_cnt;
953 ii++)
954 {
956 /* iii indexes the CDP prep area for this data source within the RRA */
957 iii=i*rrd.stat_head->ds_cnt+ii;
959 if (rrd.rra_def[i].pdp_cnt > 1) {
961 if (rra_step_cnt[i] > 0) {
962 /* If we are in this block, as least 1 CDP value will be written to
963 * disk, this is the CDP_primary_val entry. If more than 1 value needs
964 * to be written, then the "fill in" value is the CDP_secondary_val
965 * entry. */
966 if (isnan(pdp_temp[ii]))
967 {
968 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
969 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
970 } else {
971 /* CDP_secondary value is the RRA "fill in" value for intermediary
972 * CDP data entries. No matter the CF, the value is the same because
973 * the average, max, min, and last of a list of identical values is
974 * the same, namely, the value itself. */
975 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
976 }
978 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
979 > rrd.rra_def[i].pdp_cnt*
980 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
981 {
982 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
983 /* initialize carry over */
984 if (current_cf == CF_AVERAGE) {
985 if (isnan(pdp_temp[ii])) {
986 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
987 } else {
988 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
989 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
990 }
991 } else {
992 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
993 }
994 } else {
995 rrd_value_t cum_val, cur_val;
996 switch (current_cf) {
997 case CF_AVERAGE:
998 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
999 cur_val = IFDNAN(pdp_temp[ii],0.0);
1000 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1001 (cum_val + cur_val * start_pdp_offset) /
1002 (rrd.rra_def[i].pdp_cnt
1003 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1004 /* initialize carry over value */
1005 if (isnan(pdp_temp[ii])) {
1006 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1007 } else {
1008 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1009 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1010 }
1011 break;
1012 case CF_MAXIMUM:
1013 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1014 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1015 #ifdef DEBUG
1016 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1017 isnan(pdp_temp[ii])) {
1018 fprintf(stderr,
1019 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1020 i,ii);
1021 exit(-1);
1022 }
1023 #endif
1024 if (cur_val > cum_val)
1025 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1026 else
1027 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1028 /* initialize carry over value */
1029 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1030 break;
1031 case CF_MINIMUM:
1032 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1033 cur_val = IFDNAN(pdp_temp[ii],DINF);
1034 #ifdef DEBUG
1035 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1036 isnan(pdp_temp[ii])) {
1037 fprintf(stderr,
1038 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1039 i,ii);
1040 exit(-1);
1041 }
1042 #endif
1043 if (cur_val < cum_val)
1044 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1045 else
1046 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1047 /* initialize carry over value */
1048 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049 break;
1050 case CF_LAST:
1051 default:
1052 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1053 /* initialize carry over value */
1054 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1055 break;
1056 }
1057 } /* endif meets xff value requirement for a valid value */
1058 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1059 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1060 if (isnan(pdp_temp[ii]))
1061 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1062 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1063 else
1064 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1065 } else /* rra_step_cnt[i] == 0 */
1066 {
1067 #ifdef DEBUG
1068 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1069 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1070 i,ii);
1071 } else {
1072 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1073 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1074 }
1075 #endif
1076 if (isnan(pdp_temp[ii])) {
1077 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1078 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1079 {
1080 if (current_cf == CF_AVERAGE) {
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1082 elapsed_pdp_st;
1083 } else {
1084 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1085 }
1086 #ifdef DEBUG
1087 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1088 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1089 #endif
1090 } else {
1091 switch (current_cf) {
1092 case CF_AVERAGE:
1093 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1094 elapsed_pdp_st;
1095 break;
1096 case CF_MINIMUM:
1097 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1098 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099 break;
1100 case CF_MAXIMUM:
1101 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1102 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1103 break;
1104 case CF_LAST:
1105 default:
1106 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1107 break;
1108 }
1109 }
1110 }
1111 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1112 if (elapsed_pdp_st > 2)
1113 {
1114 switch (current_cf) {
1115 case CF_AVERAGE:
1116 default:
1117 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1118 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1119 break;
1120 case CF_SEASONAL:
1121 case CF_DEVSEASONAL:
1122 /* need to update cached seasonal values, so they are consistent
1123 * with the bulk update */
1124 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1125 * CDP_last_deviation are the same. */
1126 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1127 last_seasonal_coef[ii];
1128 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1129 seasonal_coef[ii];
1130 break;
1131 case CF_HWPREDICT:
1132 /* need to update the null_count and last_null_count.
1133 * even do this for non-DNAN pdp_temp because the
1134 * algorithm is not learning from batch updates. */
1135 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1136 elapsed_pdp_st;
1137 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1138 elapsed_pdp_st - 1;
1139 /* fall through */
1140 case CF_DEVPREDICT:
1141 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1142 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1143 break;
1144 case CF_FAILURES:
1145 /* do not count missed bulk values as failures */
1146 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1147 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1148 /* need to reset violations buffer.
1149 * could do this more carefully, but for now, just
1150 * assume a bulk update wipes away all violations. */
1151 erase_violations(&rrd, iii, i);
1152 break;
1153 }
1154 }
1155 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1157 if (rrd_test_error()) break;
1159 } /* endif data sources loop */
1160 } /* end RRA Loop */
1162 /* this loop is only entered if elapsed_pdp_st < 3 */
1163 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1164 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1165 {
1166 for(i = 0, rra_start = rra_begin;
1167 i < rrd.stat_head->rra_cnt;
1168 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1169 i++)
1170 {
1171 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1173 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1174 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1175 {
1176 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1177 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1178 &seasonal_coef);
1179 rra_current = ftell(rrd_file);
1180 }
1181 if (rrd_test_error()) break;
1182 /* loop over data soures within each RRA */
1183 for(ii = 0;
1184 ii < rrd.stat_head->ds_cnt;
1185 ii++)
1186 {
1187 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1188 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1189 scratch_idx, seasonal_coef);
1190 }
1191 } /* end RRA Loop */
1192 if (rrd_test_error()) break;
1193 } /* end elapsed_pdp_st loop */
1195 if (rrd_test_error()) break;
1197 /* Ready to write to disk */
1198 /* Move sequentially through the file, writing one RRA at a time.
1199 * Note this architecture divorces the computation of CDP with
1200 * flushing updated RRA entries to disk. */
1201 for(i = 0, rra_start = rra_begin;
1202 i < rrd.stat_head->rra_cnt;
1203 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1204 i++) {
1205 /* is th5Aere anything to write for this RRA? If not, continue. */
1206 if (rra_step_cnt[i] == 0) continue;
1208 /* write the first row */
1209 #ifdef DEBUG
1210 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1211 #endif
1212 rrd.rra_ptr[i].cur_row++;
1213 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1214 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1215 /* positition on the first row */
1216 rra_pos_tmp = rra_start +
1217 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1218 if(rra_pos_tmp != rra_current) {
1219 #ifndef HAVE_MMAP
1220 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1221 rrd_set_error("seek error in rrd");
1222 break;
1223 }
1224 #endif
1225 rra_current = rra_pos_tmp;
1226 }
1228 #ifdef DEBUG
1229 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1230 #endif
1231 scratch_idx = CDP_primary_val;
1232 if (pcdp_summary != NULL)
1233 {
1234 rra_time = (current_time - current_time
1235 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1236 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1237 }
1238 #ifdef HAVE_MMAP
1239 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1240 pcdp_summary, &rra_time, rrd_mmaped_file);
1241 #else
1242 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1243 pcdp_summary, &rra_time);
1244 #endif
1245 if (rrd_test_error()) break;
1247 /* write other rows of the bulk update, if any */
1248 scratch_idx = CDP_secondary_val;
1249 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1250 {
1251 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1252 {
1253 #ifdef DEBUG
1254 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1255 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1256 #endif
1257 /* wrap */
1258 rrd.rra_ptr[i].cur_row = 0;
1259 /* seek back to beginning of current rra */
1260 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1261 {
1262 rrd_set_error("seek error in rrd");
1263 break;
1264 }
1265 #ifdef DEBUG
1266 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1267 #endif
1268 rra_current = rra_start;
1269 }
1270 if (pcdp_summary != NULL)
1271 {
1272 rra_time = (current_time - current_time
1273 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1274 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1275 }
1276 #ifdef HAVE_MMAP
1277 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1278 pcdp_summary, &rra_time, rrd_mmaped_file);
1279 #else
1280 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1281 pcdp_summary, &rra_time);
1282 #endif
1283 }
1285 if (rrd_test_error())
1286 break;
1287 } /* RRA LOOP */
1289 /* break out of the argument parsing loop if error_string is set */
1290 if (rrd_test_error()){
1291 free(step_start);
1292 break;
1293 }
1295 } /* endif a pdp_st has occurred */
1296 rrd.live_head->last_up = current_time;
1297 rrd.live_head->last_up_usec = current_time_usec;
1298 free(step_start);
1299 } /* function argument loop */
1301 if (seasonal_coef != NULL) free(seasonal_coef);
1302 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1303 if (rra_step_cnt != NULL) free(rra_step_cnt);
1304 rpnstack_free(&rpnstack);
1306 #ifdef HAVE_MMAP
1307 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1308 rrd_set_error("error writing(unmapping) file: %s", filename);
1309 }
1310 #endif
1311 /* if we got here and if there is an error and if the file has not been
1312 * written to, then close things up and return. */
1313 if (rrd_test_error()) {
1314 free(updvals);
1315 free(tmpl_idx);
1316 rrd_free(&rrd);
1317 free(pdp_temp);
1318 free(pdp_new);
1319 fclose(rrd_file);
1320 return(-1);
1321 }
1323 /* aargh ... that was tough ... so many loops ... anyway, its done.
1324 * we just need to write back the live header portion now*/
1326 if (fseek(rrd_file, (sizeof(stat_head_t)
1327 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1328 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1329 SEEK_SET) != 0) {
1330 rrd_set_error("seek rrd for live header writeback");
1331 free(updvals);
1332 free(tmpl_idx);
1333 rrd_free(&rrd);
1334 free(pdp_temp);
1335 free(pdp_new);
1336 fclose(rrd_file);
1337 return(-1);
1338 }
1340 if(version >= 3) {
1341 if(fwrite( rrd.live_head,
1342 sizeof(live_head_t), 1, rrd_file) != 1){
1343 rrd_set_error("fwrite live_head to rrd");
1344 free(updvals);
1345 rrd_free(&rrd);
1346 free(tmpl_idx);
1347 free(pdp_temp);
1348 free(pdp_new);
1349 fclose(rrd_file);
1350 return(-1);
1351 }
1352 }
1353 else {
1354 if(fwrite( &rrd.live_head->last_up,
1355 sizeof(time_t), 1, rrd_file) != 1){
1356 rrd_set_error("fwrite live_head to rrd");
1357 free(updvals);
1358 rrd_free(&rrd);
1359 free(tmpl_idx);
1360 free(pdp_temp);
1361 free(pdp_new);
1362 fclose(rrd_file);
1363 return(-1);
1364 }
1365 }
1368 if(fwrite( rrd.pdp_prep,
1369 sizeof(pdp_prep_t),
1370 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1371 rrd_set_error("ftwrite pdp_prep to rrd");
1372 free(updvals);
1373 rrd_free(&rrd);
1374 free(tmpl_idx);
1375 free(pdp_temp);
1376 free(pdp_new);
1377 fclose(rrd_file);
1378 return(-1);
1379 }
1381 if(fwrite( rrd.cdp_prep,
1382 sizeof(cdp_prep_t),
1383 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1384 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1386 rrd_set_error("ftwrite cdp_prep to rrd");
1387 free(updvals);
1388 free(tmpl_idx);
1389 rrd_free(&rrd);
1390 free(pdp_temp);
1391 free(pdp_new);
1392 fclose(rrd_file);
1393 return(-1);
1394 }
1396 if(fwrite( rrd.rra_ptr,
1397 sizeof(rra_ptr_t),
1398 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1399 rrd_set_error("fwrite rra_ptr to rrd");
1400 free(updvals);
1401 free(tmpl_idx);
1402 rrd_free(&rrd);
1403 free(pdp_temp);
1404 free(pdp_new);
1405 fclose(rrd_file);
1406 return(-1);
1407 }
1409 /* OK now close the files and free the memory */
1410 if(fclose(rrd_file) != 0){
1411 rrd_set_error("closing rrd");
1412 free(updvals);
1413 free(tmpl_idx);
1414 rrd_free(&rrd);
1415 free(pdp_temp);
1416 free(pdp_new);
1417 return(-1);
1418 }
1420 /* calling the smoothing code here guarantees at most
1421 * one smoothing operation per rrd_update call. Unfortunately,
1422 * it is possible with bulk updates, or a long-delayed update
1423 * for smoothing to occur off-schedule. This really isn't
1424 * critical except during the burning cycles. */
1425 if (schedule_smooth)
1426 {
1427 rrd_file = fopen(filename,"rb+");
1430 rra_start = rra_begin;
1431 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1432 {
1433 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1434 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1435 {
1436 #ifdef DEBUG
1437 fprintf(stderr,"Running smoother for rra %ld\n",i);
1438 #endif
1439 apply_smoother(&rrd,i,rra_start,rrd_file);
1440 if (rrd_test_error())
1441 break;
1442 }
1443 rra_start += rrd.rra_def[i].row_cnt
1444 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1445 }
1446 fclose(rrd_file);
1447 }
1448 rrd_free(&rrd);
1449 free(updvals);
1450 free(tmpl_idx);
1451 free(pdp_new);
1452 free(pdp_temp);
1453 return(0);
1454 }
1456 /*
1457 * get exclusive lock to whole file.
1458 * lock gets removed when we close the file
1459 *
1460 * returns 0 on success
1461 */
1462 int
1463 LockRRD(FILE *rrdfile)
1464 {
1465 int rrd_fd; /* File descriptor for RRD */
1466 int rcstat;
1468 rrd_fd = fileno(rrdfile);
1470 {
1471 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1472 struct _stat st;
1474 if ( _fstat( rrd_fd, &st ) == 0 ) {
1475 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1476 } else {
1477 rcstat = -1;
1478 }
1479 #else
1480 struct flock lock;
1481 lock.l_type = F_WRLCK; /* exclusive write lock */
1482 lock.l_len = 0; /* whole file */
1483 lock.l_start = 0; /* start of file */
1484 lock.l_whence = SEEK_SET; /* end of file */
1486 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1487 #endif
1488 }
1490 return(rcstat);
1491 }
1494 #ifdef HAVE_MMAP
1495 info_t
1496 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1497 unsigned short CDP_scratch_idx,
1498 #ifndef DEBUG
1499 FILE UNUSED(*rrd_file),
1500 #else
1501 FILE *rrd_file,
1502 #endif
1503 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1504 #else
1505 info_t
1506 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1507 unsigned short CDP_scratch_idx, FILE *rrd_file,
1508 info_t *pcdp_summary, time_t *rra_time)
1509 #endif
1510 {
1511 unsigned long ds_idx, cdp_idx;
1512 infoval iv;
1514 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1515 {
1516 /* compute the cdp index */
1517 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1518 #ifdef DEBUG
1519 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1520 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1521 rrd -> rra_def[rra_idx].cf_nam);
1522 #endif
1523 if (pcdp_summary != NULL)
1524 {
1525 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1526 /* append info to the return hash */
1527 pcdp_summary = info_push(pcdp_summary,
1528 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1529 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1530 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1531 RD_I_VAL, iv);
1532 }
1533 #ifdef HAVE_MMAP
1534 memcpy((char *)rrd_mmaped_file + *rra_current,
1535 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1536 sizeof(rrd_value_t));
1537 #else
1538 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1539 sizeof(rrd_value_t),1,rrd_file) != 1)
1540 {
1541 rrd_set_error("writing rrd");
1542 return 0;
1543 }
1544 #endif
1545 *rra_current += sizeof(rrd_value_t);
1546 }
1547 return (pcdp_summary);
1548 }