9225fb08b0d3dda925d14e9ffbe79e05c0f80d95
1 /*****************************************************************************
2 * RRDtool 1.2.26 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14 #include <sys/locking.h>
15 #include <sys/stat.h>
16 #include <io.h>
17 #endif
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 * replacement.
29 */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
36 };
37 #endif
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
46 struct _timeb current_time;
48 _ftime(¤t_time);
50 t->tv_sec = current_time.time;
51 t->tv_usec = current_time.millitm * 1000;
53 return 0;
54 }
56 #endif
57 /*
58 * normilize time as returned by gettimeofday. usec part must
59 * be always >= 0
60 */
61 static void normalize_time(struct timeval *t)
62 {
63 if(t->tv_usec < 0) {
64 t->tv_sec--;
65 t->tv_usec += 1000000L;
66 }
67 }
69 /* Local prototypes */
70 int LockRRD(FILE *rrd_file);
71 #ifdef HAVE_MMAP
72 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
73 unsigned long *rra_current,
74 unsigned short CDP_scratch_idx,
75 #ifndef DEBUG
76 FILE UNUSED(*rrd_file),
77 #else
78 FILE *rrd_file,
79 #endif
80 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
81 #else
82 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
83 unsigned long *rra_current,
84 unsigned short CDP_scratch_idx, FILE *rrd_file,
85 info_t *pcdp_summary, time_t *rra_time);
86 #endif
87 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
88 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
89 info_t*);
91 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
94 info_t *rrd_update_v(int argc, char **argv)
95 {
96 char *tmplt = NULL;
97 info_t *result = NULL;
98 infoval rc;
99 rc.u_int = -1;
100 optind = 0; opterr = 0; /* initialize getopt */
102 while (1) {
103 static struct option long_options[] =
104 {
105 {"template", required_argument, 0, 't'},
106 {0,0,0,0}
107 };
108 int option_index = 0;
109 int opt;
110 opt = getopt_long(argc, argv, "t:",
111 long_options, &option_index);
113 if (opt == EOF)
114 break;
116 switch(opt) {
117 case 't':
118 tmplt = optarg;
119 break;
121 case '?':
122 rrd_set_error("unknown option '%s'",argv[optind-1]);
123 goto end_tag;
124 }
125 }
127 /* need at least 2 arguments: filename, data. */
128 if (argc-optind < 2) {
129 rrd_set_error("Not enough arguments");
130 goto end_tag;
131 }
132 rc.u_int = 0;
133 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
134 rc.u_int = _rrd_update(argv[optind], tmplt,
135 argc - optind - 1, (const char **)(argv + optind + 1), result);
136 result->value.u_int = rc.u_int;
137 end_tag:
138 return result;
139 }
141 int
142 rrd_update(int argc, char **argv)
143 {
144 char *tmplt = NULL;
145 int rc;
146 optind = 0; opterr = 0; /* initialize getopt */
148 while (1) {
149 static struct option long_options[] =
150 {
151 {"template", required_argument, 0, 't'},
152 {0,0,0,0}
153 };
154 int option_index = 0;
155 int opt;
156 opt = getopt_long(argc, argv, "t:",
157 long_options, &option_index);
159 if (opt == EOF)
160 break;
162 switch(opt) {
163 case 't':
164 tmplt = optarg;
165 break;
167 case '?':
168 rrd_set_error("unknown option '%s'",argv[optind-1]);
169 return(-1);
170 }
171 }
173 /* need at least 2 arguments: filename, data. */
174 if (argc-optind < 2) {
175 rrd_set_error("Not enough arguments");
177 return -1;
178 }
180 rc = rrd_update_r(argv[optind], tmplt,
181 argc - optind - 1, (const char **)(argv + optind + 1));
182 return rc;
183 }
185 int
186 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
187 {
188 return _rrd_update(filename, tmplt, argc, argv, NULL);
189 }
191 int
192 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
193 info_t *pcdp_summary)
194 {
196 int arg_i = 2;
197 short j;
198 unsigned long i,ii,iii=1;
200 unsigned long rra_begin; /* byte pointer to the rra
201 * area in the rrd file. this
202 * pointer never changes value */
203 unsigned long rra_start; /* byte pointer to the rra
204 * area in the rrd file. this
205 * pointer changes as each rrd is
206 * processed. */
207 unsigned long rra_current; /* byte pointer to the current write
208 * spot in the rrd file. */
209 unsigned long rra_pos_tmp; /* temporary byte pointer. */
210 double interval,
211 pre_int,post_int; /* interval between this and
212 * the last run */
213 unsigned long proc_pdp_st; /* which pdp_st was the last
214 * to be processed */
215 unsigned long occu_pdp_st; /* when was the pdp_st
216 * before the last update
217 * time */
218 unsigned long proc_pdp_age; /* how old was the data in
219 * the pdp prep area when it
220 * was last updated */
221 unsigned long occu_pdp_age; /* how long ago was the last
222 * pdp_step time */
223 rrd_value_t *pdp_new; /* prepare the incoming data
224 * to be added the the
225 * existing entry */
226 rrd_value_t *pdp_temp; /* prepare the pdp values
227 * to be added the the
228 * cdp values */
230 long *tmpl_idx; /* index representing the settings
231 transported by the tmplt index */
232 unsigned long tmpl_cnt = 2; /* time and data */
234 FILE *rrd_file;
235 rrd_t rrd;
236 time_t current_time = 0;
237 time_t rra_time = 0; /* time of update for a RRA */
238 unsigned long current_time_usec=0;/* microseconds part of current time */
239 struct timeval tmp_time; /* used for time conversion */
241 char **updvals;
242 int schedule_smooth = 0;
243 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
244 /* a vector of future Holt-Winters seasonal coefs */
245 unsigned long elapsed_pdp_st;
246 /* number of elapsed PDP steps since last update */
247 unsigned long *rra_step_cnt = NULL;
248 /* number of rows to be updated in an RRA for a data
249 * value. */
250 unsigned long start_pdp_offset;
251 /* number of PDP steps since the last update that
252 * are assigned to the first CDP to be generated
253 * since the last update. */
254 unsigned short scratch_idx;
255 /* index into the CDP scratch array */
256 enum cf_en current_cf;
257 /* numeric id of the current consolidation function */
258 rpnstack_t rpnstack; /* used for COMPUTE DS */
259 int version; /* rrd version */
260 char *endptr; /* used in the conversion */
262 #ifdef HAVE_MMAP
263 void *rrd_mmaped_file;
264 unsigned long rrd_filesize;
265 #endif
268 rpnstack_init(&rpnstack);
270 /* need at least 1 arguments: data. */
271 if (argc < 1) {
272 rrd_set_error("Not enough arguments");
273 return -1;
274 }
278 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
279 return -1;
280 }
282 /* initialize time */
283 version = atoi(rrd.stat_head->version);
284 gettimeofday(&tmp_time, 0);
285 normalize_time(&tmp_time);
286 current_time = tmp_time.tv_sec;
287 if(version >= 3) {
288 current_time_usec = tmp_time.tv_usec;
289 }
290 else {
291 current_time_usec = 0;
292 }
294 rra_current = rra_start = rra_begin = ftell(rrd_file);
295 /* This is defined in the ANSI C standard, section 7.9.5.3:
297 When a file is opened with udpate mode ('+' as the second
298 or third character in the ... list of mode argument
299 variables), both input and ouptut may be performed on the
300 associated stream. However, ... input may not be directly
301 followed by output without an intervening call to a file
302 positioning function, unless the input oepration encounters
303 end-of-file. */
304 #ifdef HAVE_MMAP
305 fseek(rrd_file, 0, SEEK_END);
306 rrd_filesize = ftell(rrd_file);
307 fseek(rrd_file, rra_current, SEEK_SET);
308 #else
309 fseek(rrd_file, 0, SEEK_CUR);
310 #endif
313 /* get exclusive lock to whole file.
314 * lock gets removed when we close the file.
315 */
316 if (LockRRD(rrd_file) != 0) {
317 rrd_set_error("could not lock RRD");
318 rrd_free(&rrd);
319 fclose(rrd_file);
320 return(-1);
321 }
323 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
324 rrd_set_error("allocating updvals pointer array");
325 rrd_free(&rrd);
326 fclose(rrd_file);
327 return(-1);
328 }
330 if ((pdp_temp = malloc(sizeof(rrd_value_t)
331 *rrd.stat_head->ds_cnt))==NULL){
332 rrd_set_error("allocating pdp_temp ...");
333 free(updvals);
334 rrd_free(&rrd);
335 fclose(rrd_file);
336 return(-1);
337 }
339 if ((tmpl_idx = malloc(sizeof(unsigned long)
340 *(rrd.stat_head->ds_cnt+1)))==NULL){
341 rrd_set_error("allocating tmpl_idx ...");
342 free(pdp_temp);
343 free(updvals);
344 rrd_free(&rrd);
345 fclose(rrd_file);
346 return(-1);
347 }
348 /* initialize tmplt redirector */
349 /* default config example (assume DS 1 is a CDEF DS)
350 tmpl_idx[0] -> 0; (time)
351 tmpl_idx[1] -> 1; (DS 0)
352 tmpl_idx[2] -> 3; (DS 2)
353 tmpl_idx[3] -> 4; (DS 3) */
354 tmpl_idx[0] = 0; /* time */
355 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
356 {
357 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
358 tmpl_idx[ii++]=i;
359 }
360 tmpl_cnt= ii;
362 if (tmplt) {
363 /* we should work on a writeable copy here */
364 char *dsname;
365 unsigned int tmpl_len;
366 char *tmplt_copy = strdup(tmplt);
367 dsname = tmplt_copy;
368 tmpl_cnt = 1; /* the first entry is the time */
369 tmpl_len = strlen(tmplt_copy);
370 for(i=0;i<=tmpl_len ;i++) {
371 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
372 tmplt_copy[i] = '\0';
373 if (tmpl_cnt>rrd.stat_head->ds_cnt){
374 rrd_set_error("tmplt contains more DS definitions than RRD");
375 free(updvals); free(pdp_temp);
376 free(tmpl_idx); rrd_free(&rrd);
377 fclose(rrd_file); return(-1);
378 }
379 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
380 rrd_set_error("unknown DS name '%s'",dsname);
381 free(updvals); free(pdp_temp);
382 free(tmplt_copy);
383 free(tmpl_idx); rrd_free(&rrd);
384 fclose(rrd_file); return(-1);
385 } else {
386 /* the first element is always the time */
387 tmpl_idx[tmpl_cnt-1]++;
388 /* go to the next entry on the tmplt_copy */
389 dsname = &tmplt_copy[i+1];
390 /* fix the damage we did before */
391 if (i<tmpl_len) {
392 tmplt_copy[i]=':';
393 }
395 }
396 }
397 }
398 free(tmplt_copy);
399 }
400 if ((pdp_new = malloc(sizeof(rrd_value_t)
401 *rrd.stat_head->ds_cnt))==NULL){
402 rrd_set_error("allocating pdp_new ...");
403 free(updvals);
404 free(pdp_temp);
405 free(tmpl_idx);
406 rrd_free(&rrd);
407 fclose(rrd_file);
408 return(-1);
409 }
411 #ifdef HAVE_MMAP
412 rrd_mmaped_file = mmap(0,
413 rrd_filesize,
414 PROT_READ | PROT_WRITE,
415 MAP_SHARED,
416 fileno(rrd_file),
417 0);
418 if (rrd_mmaped_file == MAP_FAILED) {
419 rrd_set_error("error mmapping file %s", filename);
420 free(updvals);
421 free(pdp_temp);
422 free(tmpl_idx);
423 rrd_free(&rrd);
424 fclose(rrd_file);
425 return(-1);
426 }
427 #ifdef USE_MADVISE
428 /* when we use mmaping we tell the kernel the mmap equivalent
429 of POSIX_FADV_RANDOM */
430 madvise(rrd_mmaped_file,rrd_filesize,MADV_RANDOM);
431 #endif
432 #endif
433 /* loop through the arguments. */
434 for(arg_i=0; arg_i<argc;arg_i++) {
435 char *stepper = strdup(argv[arg_i]);
436 char *step_start = stepper;
437 char *p;
438 char *parsetime_error = NULL;
439 enum {atstyle, normal} timesyntax;
440 struct rrd_time_value ds_tv;
441 if (stepper == NULL){
442 rrd_set_error("failed duplication argv entry");
443 free(step_start);
444 free(updvals);
445 free(pdp_temp);
446 free(tmpl_idx);
447 rrd_free(&rrd);
448 #ifdef HAVE_MMAP
449 munmap(rrd_mmaped_file, rrd_filesize);
450 #endif
451 fclose(rrd_file);
452 return(-1);
453 }
454 /* initialize all ds input to unknown except the first one
455 which has always got to be set */
456 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
457 updvals[0]=stepper;
458 /* separate all ds elements; first must be examined separately
459 due to alternate time syntax */
460 if ((p=strchr(stepper,'@'))!=NULL) {
461 timesyntax = atstyle;
462 *p = '\0';
463 stepper = p+1;
464 } else if ((p=strchr(stepper,':'))!=NULL) {
465 timesyntax = normal;
466 *p = '\0';
467 stepper = p+1;
468 } else {
469 rrd_set_error("expected timestamp not found in data source from %s",
470 argv[arg_i]);
471 free(step_start);
472 break;
473 }
474 ii=1;
475 updvals[tmpl_idx[ii]] = stepper;
476 while (*stepper) {
477 if (*stepper == ':') {
478 *stepper = '\0';
479 ii++;
480 if (ii<tmpl_cnt){
481 updvals[tmpl_idx[ii]] = stepper+1;
482 }
483 }
484 stepper++;
485 }
487 if (ii != tmpl_cnt-1) {
488 rrd_set_error("expected %lu data source readings (got %lu) from %s",
489 tmpl_cnt-1, ii, argv[arg_i]);
490 free(step_start);
491 break;
492 }
494 /* get the time from the reading ... handle N */
495 if (timesyntax == atstyle) {
496 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
497 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
498 free(step_start);
499 break;
500 }
501 if (ds_tv.type == RELATIVE_TO_END_TIME ||
502 ds_tv.type == RELATIVE_TO_START_TIME) {
503 rrd_set_error("specifying time relative to the 'start' "
504 "or 'end' makes no sense here: %s",
505 updvals[0]);
506 free(step_start);
507 break;
508 }
510 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
511 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
513 } else if (strcmp(updvals[0],"N")==0){
514 gettimeofday(&tmp_time, 0);
515 normalize_time(&tmp_time);
516 current_time = tmp_time.tv_sec;
517 current_time_usec = tmp_time.tv_usec;
518 } else {
519 double tmp;
520 tmp = strtod(updvals[0], 0);
521 current_time = floor(tmp);
522 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
523 }
524 /* dont do any correction for old version RRDs */
525 if(version < 3)
526 current_time_usec = 0;
528 if(current_time < rrd.live_head->last_up ||
529 (current_time == rrd.live_head->last_up &&
530 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
531 rrd_set_error("illegal attempt to update using time %ld when "
532 "last update time is %ld (minimum one second step)",
533 current_time, rrd.live_head->last_up);
534 free(step_start);
535 break;
536 }
539 /* seek to the beginning of the rra's */
540 if (rra_current != rra_begin) {
541 #ifndef HAVE_MMAP
542 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
543 rrd_set_error("seek error in rrd");
544 free(step_start);
545 break;
546 }
547 #endif
548 rra_current = rra_begin;
549 }
550 rra_start = rra_begin;
552 /* when was the current pdp started */
553 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
554 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
556 /* when did the last pdp_st occur */
557 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
558 occu_pdp_st = current_time - occu_pdp_age;
560 /* interval = current_time - rrd.live_head->last_up; */
561 interval = (double)(current_time - rrd.live_head->last_up)
562 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
564 if (occu_pdp_st > proc_pdp_st){
565 /* OK we passed the pdp_st moment*/
566 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
567 * occurred before the latest
568 * pdp_st moment*/
569 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
570 post_int = occu_pdp_age; /* how much after it */
571 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
572 } else {
573 pre_int = interval;
574 post_int = 0;
575 }
577 #ifdef DEBUG
578 printf(
579 "proc_pdp_age %lu\t"
580 "proc_pdp_st %lu\t"
581 "occu_pfp_age %lu\t"
582 "occu_pdp_st %lu\t"
583 "int %lf\t"
584 "pre_int %lf\t"
585 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
586 occu_pdp_age, occu_pdp_st,
587 interval, pre_int, post_int);
588 #endif
590 /* process the data sources and update the pdp_prep
591 * area accordingly */
592 for(i=0;i<rrd.stat_head->ds_cnt;i++){
593 enum dst_en dst_idx;
594 dst_idx= dst_conv(rrd.ds_def[i].dst);
596 /* make sure we do not build diffs with old last_ds values */
597 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
598 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
599 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
600 }
602 /* NOTE: DST_CDEF should never enter this if block, because
603 * updvals[i+1][0] is initialized to 'U'; unless the caller
604 * accidently specified a value for the DST_CDEF. To handle
605 * this case, an extra check is required. */
607 if((updvals[i+1][0] != 'U') &&
608 (dst_idx != DST_CDEF) &&
609 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
610 double rate = DNAN;
611 /* the data source type defines how to process the data */
612 /* pdp_new contains rate * time ... eg the bytes
613 * transferred during the interval. Doing it this way saves
614 * a lot of math operations */
617 switch(dst_idx){
618 case DST_COUNTER:
619 case DST_DERIVE:
620 for(ii=0;updvals[i+1][ii] != '\0';ii++){
621 if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
622 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
623 break;
624 }
625 }
626 if (rrd_test_error()){
627 break;
628 }
629 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
630 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
631 if(dst_idx == DST_COUNTER) {
632 /* simple overflow catcher suggested by Andres Kroonmaa */
633 /* this will fail terribly for non 32 or 64 bit counters ... */
634 /* are there any others in SNMP land ? */
635 if (pdp_new[i] < (double)0.0 )
636 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
637 if (pdp_new[i] < (double)0.0 )
638 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
639 }
640 rate = pdp_new[i] / interval;
641 }
642 else {
643 pdp_new[i]= DNAN;
644 }
645 break;
646 case DST_ABSOLUTE:
647 errno = 0;
648 pdp_new[i] = strtod(updvals[i+1],&endptr);
649 if (errno > 0){
650 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
651 break;
652 };
653 if (endptr[0] != '\0'){
654 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
655 break;
656 }
657 rate = pdp_new[i] / interval;
658 break;
659 case DST_GAUGE:
660 errno = 0;
661 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
662 if (errno > 0){
663 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
664 break;
665 };
666 if (endptr[0] != '\0'){
667 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
668 break;
669 }
670 rate = pdp_new[i] / interval;
671 break;
672 default:
673 rrd_set_error("rrd contains unknown DS type : '%s'",
674 rrd.ds_def[i].dst);
675 break;
676 }
677 /* break out of this for loop if the error string is set */
678 if (rrd_test_error()){
679 break;
680 }
681 /* make sure pdp_temp is neither too large or too small
682 * if any of these occur it becomes unknown ...
683 * sorry folks ... */
684 if ( ! isnan(rate) &&
685 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
686 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
687 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
688 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
689 pdp_new[i] = DNAN;
690 }
691 } else {
692 /* no news is news all the same */
693 pdp_new[i] = DNAN;
694 }
697 /* make a copy of the command line argument for the next run */
698 #ifdef DEBUG
699 fprintf(stderr,
700 "prep ds[%lu]\t"
701 "last_arg '%s'\t"
702 "this_arg '%s'\t"
703 "pdp_new %10.2f\n",
704 i,
705 rrd.pdp_prep[i].last_ds,
706 updvals[i+1], pdp_new[i]);
707 #endif
708 strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
709 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
710 }
711 /* break out of the argument parsing loop if the error_string is set */
712 if (rrd_test_error()){
713 free(step_start);
714 break;
715 }
716 /* has a pdp_st moment occurred since the last run ? */
718 if (proc_pdp_st == occu_pdp_st){
719 /* no we have not passed a pdp_st moment. therefore update is simple */
721 for(i=0;i<rrd.stat_head->ds_cnt;i++){
722 if(isnan(pdp_new[i])) {
723 /* this is not realy accurate if we use subsecond data arival time
724 should have thought of it when going subsecond resolution ...
725 sorry next format change we will have it! */
726 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
727 } else {
728 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
729 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
730 } else {
731 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
732 }
733 }
734 #ifdef DEBUG
735 fprintf(stderr,
736 "NO PDP ds[%lu]\t"
737 "value %10.2f\t"
738 "unkn_sec %5lu\n",
739 i,
740 rrd.pdp_prep[i].scratch[PDP_val].u_val,
741 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
742 #endif
743 }
744 } else {
745 /* an pdp_st has occurred. */
747 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
748 * occurred up to the last run.
749 pdp_new[] contains rate*seconds from the latest run.
750 pdp_temp[] will contain the rate for cdp */
752 for(i=0;i<rrd.stat_head->ds_cnt;i++){
753 /* update pdp_prep to the current pdp_st. */
754 double pre_unknown = 0.0;
755 if(isnan(pdp_new[i]))
756 /* a final bit of unkonwn to be added bevore calculation
757 * we use a tempaorary variable for this so that we
758 * don't have to turn integer lines before using the value */
759 pre_unknown = pre_int;
760 else {
761 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
762 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
763 } else {
764 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
765 }
766 }
769 /* if too much of the pdp_prep is unknown we dump it */
770 if (
771 /* removed because this does not agree with the definition
772 a heart beat can be unknown */
773 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775 /* if the interval is larger thatn mrhb we get NAN */
776 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777 (occu_pdp_st-proc_pdp_st <=
778 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
779 pdp_temp[i] = DNAN;
780 } else {
781 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782 / ((double)(occu_pdp_st - proc_pdp_st
783 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
784 -pre_unknown);
785 }
787 /* process CDEF data sources; remember each CDEF DS can
788 * only reference other DS with a lower index number */
789 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
790 rpnp_t *rpnp;
791 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
792 /* substitue data values for OP_VARIABLE nodes */
793 for (ii = 0; rpnp[ii].op != OP_END; ii++)
794 {
795 if (rpnp[ii].op == OP_VARIABLE) {
796 rpnp[ii].op = OP_NUMBER;
797 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
798 }
799 }
800 /* run the rpn calculator */
801 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
802 free(rpnp);
803 break; /* exits the data sources pdp_temp loop */
804 }
805 }
807 /* make pdp_prep ready for the next run */
808 if(isnan(pdp_new[i])){
809 /* this is not realy accurate if we use subsecond data arival time
810 should have thought of it when going subsecond resolution ...
811 sorry next format change we will have it! */
812 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
813 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
814 } else {
815 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
816 rrd.pdp_prep[i].scratch[PDP_val].u_val =
817 pdp_new[i]/interval*post_int;
818 }
820 #ifdef DEBUG
821 fprintf(stderr,
822 "PDP UPD ds[%lu]\t"
823 "pdp_temp %10.2f\t"
824 "new_prep %10.2f\t"
825 "new_unkn_sec %5lu\n",
826 i, pdp_temp[i],
827 rrd.pdp_prep[i].scratch[PDP_val].u_val,
828 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
829 #endif
830 }
832 /* if there were errors during the last loop, bail out here */
833 if (rrd_test_error()){
834 free(step_start);
835 break;
836 }
838 /* compute the number of elapsed pdp_st moments */
839 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
840 #ifdef DEBUG
841 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
842 #endif
843 if (rra_step_cnt == NULL)
844 {
845 rra_step_cnt = (unsigned long *)
846 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
847 }
849 for(i = 0, rra_start = rra_begin;
850 i < rrd.stat_head->rra_cnt;
851 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
852 i++)
853 {
854 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
855 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
856 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
857 if (start_pdp_offset <= elapsed_pdp_st) {
858 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
859 rrd.rra_def[i].pdp_cnt + 1;
860 } else {
861 rra_step_cnt[i] = 0;
862 }
864 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
865 {
866 /* If this is a bulk update, we need to skip ahead in the seasonal
867 * arrays so that they will be correct for the next observed value;
868 * note that for the bulk update itself, no update will occur to
869 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
870 * be set to DNAN. */
871 if (rra_step_cnt[i] > 2)
872 {
873 /* skip update by resetting rra_step_cnt[i],
874 * note that this is not data source specific; this is due
875 * to the bulk update, not a DNAN value for the specific data
876 * source. */
877 rra_step_cnt[i] = 0;
878 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
879 &last_seasonal_coef);
880 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
881 &seasonal_coef);
882 }
884 /* periodically run a smoother for seasonal effects */
885 /* Need to use first cdp parameter buffer to track
886 * burnin (burnin requires a specific smoothing schedule).
887 * The CDP_init_seasonal parameter is really an RRA level,
888 * not a data source within RRA level parameter, but the rra_def
889 * is read only for rrd_update (not flushed to disk). */
890 iii = i*(rrd.stat_head -> ds_cnt);
891 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
892 <= BURNIN_CYCLES)
893 {
894 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
895 > rrd.rra_def[i].row_cnt - 1) {
896 /* mark off one of the burnin cycles */
897 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
898 schedule_smooth = 1;
899 }
900 } else {
901 /* someone has no doubt invented a trick to deal with this
902 * wrap around, but at least this code is clear. */
903 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
904 rrd.rra_ptr[i].cur_row)
905 {
906 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
907 * mapping between PDP and CDP */
908 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
909 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
910 {
911 #ifdef DEBUG
912 fprintf(stderr,
913 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
914 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
915 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
916 #endif
917 schedule_smooth = 1;
918 }
919 } else {
920 /* can't rely on negative numbers because we are working with
921 * unsigned values */
922 /* Don't need modulus here. If we've wrapped more than once, only
923 * one smooth is executed at the end. */
924 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
925 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
926 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
927 {
928 #ifdef DEBUG
929 fprintf(stderr,
930 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
931 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
932 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
933 #endif
934 schedule_smooth = 1;
935 }
936 }
937 }
939 rra_current = ftell(rrd_file);
940 } /* if cf is DEVSEASONAL or SEASONAL */
942 if (rrd_test_error()) break;
944 /* update CDP_PREP areas */
945 /* loop over data soures within each RRA */
946 for(ii = 0;
947 ii < rrd.stat_head->ds_cnt;
948 ii++)
949 {
951 /* iii indexes the CDP prep area for this data source within the RRA */
952 iii=i*rrd.stat_head->ds_cnt+ii;
954 if (rrd.rra_def[i].pdp_cnt > 1) {
956 if (rra_step_cnt[i] > 0) {
957 /* If we are in this block, as least 1 CDP value will be written to
958 * disk, this is the CDP_primary_val entry. If more than 1 value needs
959 * to be written, then the "fill in" value is the CDP_secondary_val
960 * entry. */
961 if (isnan(pdp_temp[ii]))
962 {
963 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
964 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
965 } else {
966 /* CDP_secondary value is the RRA "fill in" value for intermediary
967 * CDP data entries. No matter the CF, the value is the same because
968 * the average, max, min, and last of a list of identical values is
969 * the same, namely, the value itself. */
970 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
971 }
973 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
974 > rrd.rra_def[i].pdp_cnt*
975 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
976 {
977 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
978 /* initialize carry over */
979 if (current_cf == CF_AVERAGE) {
980 if (isnan(pdp_temp[ii])) {
981 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
982 } else {
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
984 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
985 }
986 } else {
987 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
988 }
989 } else {
990 rrd_value_t cum_val, cur_val;
991 switch (current_cf) {
992 case CF_AVERAGE:
993 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
994 cur_val = IFDNAN(pdp_temp[ii],0.0);
995 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
996 (cum_val + cur_val * start_pdp_offset) /
997 (rrd.rra_def[i].pdp_cnt
998 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
999 /* initialize carry over value */
1000 if (isnan(pdp_temp[ii])) {
1001 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1002 } else {
1003 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1004 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1005 }
1006 break;
1007 case CF_MAXIMUM:
1008 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1009 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1010 #ifdef DEBUG
1011 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1012 isnan(pdp_temp[ii])) {
1013 fprintf(stderr,
1014 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1015 i,ii);
1016 exit(-1);
1017 }
1018 #endif
1019 if (cur_val > cum_val)
1020 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1021 else
1022 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1023 /* initialize carry over value */
1024 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1025 break;
1026 case CF_MINIMUM:
1027 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1028 cur_val = IFDNAN(pdp_temp[ii],DINF);
1029 #ifdef DEBUG
1030 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1031 isnan(pdp_temp[ii])) {
1032 fprintf(stderr,
1033 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1034 i,ii);
1035 exit(-1);
1036 }
1037 #endif
1038 if (cur_val < cum_val)
1039 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1040 else
1041 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1042 /* initialize carry over value */
1043 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1044 break;
1045 case CF_LAST:
1046 default:
1047 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1048 /* initialize carry over value */
1049 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1050 break;
1051 }
1052 } /* endif meets xff value requirement for a valid value */
1053 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1054 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1055 if (isnan(pdp_temp[ii]))
1056 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1057 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1058 else
1059 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1060 } else /* rra_step_cnt[i] == 0 */
1061 {
1062 #ifdef DEBUG
1063 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1064 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1065 i,ii);
1066 } else {
1067 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1068 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1069 }
1070 #endif
1071 if (isnan(pdp_temp[ii])) {
1072 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1073 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1074 {
1075 if (current_cf == CF_AVERAGE) {
1076 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1077 elapsed_pdp_st;
1078 } else {
1079 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1080 }
1081 #ifdef DEBUG
1082 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1083 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1084 #endif
1085 } else {
1086 switch (current_cf) {
1087 case CF_AVERAGE:
1088 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1089 elapsed_pdp_st;
1090 break;
1091 case CF_MINIMUM:
1092 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1093 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094 break;
1095 case CF_MAXIMUM:
1096 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1097 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098 break;
1099 case CF_LAST:
1100 default:
1101 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1102 break;
1103 }
1104 }
1105 }
1106 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1107 if (elapsed_pdp_st > 2)
1108 {
1109 switch (current_cf) {
1110 case CF_AVERAGE:
1111 default:
1112 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1113 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1114 break;
1115 case CF_SEASONAL:
1116 case CF_DEVSEASONAL:
1117 /* need to update cached seasonal values, so they are consistent
1118 * with the bulk update */
1119 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1120 * CDP_last_deviation are the same. */
1121 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1122 last_seasonal_coef[ii];
1123 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1124 seasonal_coef[ii];
1125 break;
1126 case CF_HWPREDICT:
1127 /* need to update the null_count and last_null_count.
1128 * even do this for non-DNAN pdp_temp because the
1129 * algorithm is not learning from batch updates. */
1130 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1131 elapsed_pdp_st;
1132 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1133 elapsed_pdp_st - 1;
1134 /* fall through */
1135 case CF_DEVPREDICT:
1136 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1137 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1138 break;
1139 case CF_FAILURES:
1140 /* do not count missed bulk values as failures */
1141 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1142 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1143 /* need to reset violations buffer.
1144 * could do this more carefully, but for now, just
1145 * assume a bulk update wipes away all violations. */
1146 erase_violations(&rrd, iii, i);
1147 break;
1148 }
1149 }
1150 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1152 if (rrd_test_error()) break;
1154 } /* endif data sources loop */
1155 } /* end RRA Loop */
1157 /* this loop is only entered if elapsed_pdp_st < 3 */
1158 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1159 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1160 {
1161 for(i = 0, rra_start = rra_begin;
1162 i < rrd.stat_head->rra_cnt;
1163 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1164 i++)
1165 {
1166 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1168 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1169 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1170 {
1171 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1172 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1173 &seasonal_coef);
1174 rra_current = ftell(rrd_file);
1175 }
1176 if (rrd_test_error()) break;
1177 /* loop over data soures within each RRA */
1178 for(ii = 0;
1179 ii < rrd.stat_head->ds_cnt;
1180 ii++)
1181 {
1182 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1183 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1184 scratch_idx, seasonal_coef);
1185 }
1186 } /* end RRA Loop */
1187 if (rrd_test_error()) break;
1188 } /* end elapsed_pdp_st loop */
1190 if (rrd_test_error()) break;
1192 /* Ready to write to disk */
1193 /* Move sequentially through the file, writing one RRA at a time.
1194 * Note this architecture divorces the computation of CDP with
1195 * flushing updated RRA entries to disk. */
1196 for(i = 0, rra_start = rra_begin;
1197 i < rrd.stat_head->rra_cnt;
1198 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1199 i++) {
1200 /* is th5Aere anything to write for this RRA? If not, continue. */
1201 if (rra_step_cnt[i] == 0) continue;
1203 /* write the first row */
1204 #ifdef DEBUG
1205 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1206 #endif
1207 rrd.rra_ptr[i].cur_row++;
1208 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1209 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1210 /* positition on the first row */
1211 rra_pos_tmp = rra_start +
1212 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1213 if(rra_pos_tmp != rra_current) {
1214 #ifndef HAVE_MMAP
1215 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1216 rrd_set_error("seek error in rrd");
1217 break;
1218 }
1219 #endif
1220 rra_current = rra_pos_tmp;
1221 }
1223 #ifdef DEBUG
1224 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1225 #endif
1226 scratch_idx = CDP_primary_val;
1227 if (pcdp_summary != NULL)
1228 {
1229 rra_time = (current_time - current_time
1230 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1231 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1232 }
1233 #ifdef HAVE_MMAP
1234 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1235 pcdp_summary, &rra_time, rrd_mmaped_file);
1236 #else
1237 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1238 pcdp_summary, &rra_time);
1239 #endif
1240 if (rrd_test_error()) break;
1242 /* write other rows of the bulk update, if any */
1243 scratch_idx = CDP_secondary_val;
1244 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1245 {
1246 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1247 {
1248 #ifdef DEBUG
1249 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1250 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1251 #endif
1252 /* wrap */
1253 rrd.rra_ptr[i].cur_row = 0;
1254 /* seek back to beginning of current rra */
1255 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1256 {
1257 rrd_set_error("seek error in rrd");
1258 break;
1259 }
1260 #ifdef DEBUG
1261 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1262 #endif
1263 rra_current = rra_start;
1264 }
1265 if (pcdp_summary != NULL)
1266 {
1267 rra_time = (current_time - current_time
1268 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1269 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1270 }
1271 #ifdef HAVE_MMAP
1272 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1273 pcdp_summary, &rra_time, rrd_mmaped_file);
1274 #else
1275 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1276 pcdp_summary, &rra_time);
1277 #endif
1278 }
1280 if (rrd_test_error())
1281 break;
1282 } /* RRA LOOP */
1284 /* break out of the argument parsing loop if error_string is set */
1285 if (rrd_test_error()){
1286 free(step_start);
1287 break;
1288 }
1290 } /* endif a pdp_st has occurred */
1291 rrd.live_head->last_up = current_time;
1292 rrd.live_head->last_up_usec = current_time_usec;
1293 free(step_start);
1294 } /* function argument loop */
1296 if (seasonal_coef != NULL) free(seasonal_coef);
1297 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1298 if (rra_step_cnt != NULL) free(rra_step_cnt);
1299 rpnstack_free(&rpnstack);
1301 #ifdef HAVE_MMAP
1302 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1303 rrd_set_error("error writing(unmapping) file: %s", filename);
1304 }
1305 #endif
1306 /* if we got here and if there is an error and if the file has not been
1307 * written to, then close things up and return. */
1308 if (rrd_test_error()) {
1309 free(updvals);
1310 free(tmpl_idx);
1311 rrd_free(&rrd);
1312 free(pdp_temp);
1313 free(pdp_new);
1314 fclose(rrd_file);
1315 return(-1);
1316 }
1318 /* aargh ... that was tough ... so many loops ... anyway, its done.
1319 * we just need to write back the live header portion now*/
1321 if (fseek(rrd_file, (sizeof(stat_head_t)
1322 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1323 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1324 SEEK_SET) != 0) {
1325 rrd_set_error("seek rrd for live header writeback");
1326 free(updvals);
1327 free(tmpl_idx);
1328 rrd_free(&rrd);
1329 free(pdp_temp);
1330 free(pdp_new);
1331 fclose(rrd_file);
1332 return(-1);
1333 }
1335 if(version >= 3) {
1336 if(fwrite( rrd.live_head,
1337 sizeof(live_head_t), 1, rrd_file) != 1){
1338 rrd_set_error("fwrite live_head to rrd");
1339 free(updvals);
1340 rrd_free(&rrd);
1341 free(tmpl_idx);
1342 free(pdp_temp);
1343 free(pdp_new);
1344 fclose(rrd_file);
1345 return(-1);
1346 }
1347 }
1348 else {
1349 if(fwrite( &rrd.live_head->last_up,
1350 sizeof(time_t), 1, rrd_file) != 1){
1351 rrd_set_error("fwrite live_head to rrd");
1352 free(updvals);
1353 rrd_free(&rrd);
1354 free(tmpl_idx);
1355 free(pdp_temp);
1356 free(pdp_new);
1357 fclose(rrd_file);
1358 return(-1);
1359 }
1360 }
1363 if(fwrite( rrd.pdp_prep,
1364 sizeof(pdp_prep_t),
1365 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1366 rrd_set_error("ftwrite pdp_prep to rrd");
1367 free(updvals);
1368 rrd_free(&rrd);
1369 free(tmpl_idx);
1370 free(pdp_temp);
1371 free(pdp_new);
1372 fclose(rrd_file);
1373 return(-1);
1374 }
1376 if(fwrite( rrd.cdp_prep,
1377 sizeof(cdp_prep_t),
1378 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1379 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1381 rrd_set_error("ftwrite cdp_prep to rrd");
1382 free(updvals);
1383 free(tmpl_idx);
1384 rrd_free(&rrd);
1385 free(pdp_temp);
1386 free(pdp_new);
1387 fclose(rrd_file);
1388 return(-1);
1389 }
1391 if(fwrite( rrd.rra_ptr,
1392 sizeof(rra_ptr_t),
1393 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1394 rrd_set_error("fwrite rra_ptr to rrd");
1395 free(updvals);
1396 free(tmpl_idx);
1397 rrd_free(&rrd);
1398 free(pdp_temp);
1399 free(pdp_new);
1400 fclose(rrd_file);
1401 return(-1);
1402 }
1404 /* OK now close the files and free the memory */
1405 if(fclose(rrd_file) != 0){
1406 rrd_set_error("closing rrd");
1407 free(updvals);
1408 free(tmpl_idx);
1409 rrd_free(&rrd);
1410 free(pdp_temp);
1411 free(pdp_new);
1412 return(-1);
1413 }
1415 /* calling the smoothing code here guarantees at most
1416 * one smoothing operation per rrd_update call. Unfortunately,
1417 * it is possible with bulk updates, or a long-delayed update
1418 * for smoothing to occur off-schedule. This really isn't
1419 * critical except during the burning cycles. */
1420 if (schedule_smooth)
1421 {
1422 rrd_file = fopen(filename,"rb+");
1425 rra_start = rra_begin;
1426 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1427 {
1428 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1429 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1430 {
1431 #ifdef DEBUG
1432 fprintf(stderr,"Running smoother for rra %ld\n",i);
1433 #endif
1434 apply_smoother(&rrd,i,rra_start,rrd_file);
1435 if (rrd_test_error())
1436 break;
1437 }
1438 rra_start += rrd.rra_def[i].row_cnt
1439 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1440 }
1441 fclose(rrd_file);
1442 }
1443 rrd_free(&rrd);
1444 free(updvals);
1445 free(tmpl_idx);
1446 free(pdp_new);
1447 free(pdp_temp);
1448 return(0);
1449 }
1451 /*
1452 * get exclusive lock to whole file.
1453 * lock gets removed when we close the file
1454 *
1455 * returns 0 on success
1456 */
1457 int
1458 LockRRD(FILE *rrdfile)
1459 {
1460 int rrd_fd; /* File descriptor for RRD */
1461 int rcstat;
1463 rrd_fd = fileno(rrdfile);
1465 {
1466 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1467 struct _stat st;
1469 if ( _fstat( rrd_fd, &st ) == 0 ) {
1470 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1471 } else {
1472 rcstat = -1;
1473 }
1474 #else
1475 struct flock lock;
1476 lock.l_type = F_WRLCK; /* exclusive write lock */
1477 lock.l_len = 0; /* whole file */
1478 lock.l_start = 0; /* start of file */
1479 lock.l_whence = SEEK_SET; /* end of file */
1481 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1482 #endif
1483 }
1485 return(rcstat);
1486 }
1489 #ifdef HAVE_MMAP
1490 info_t
1491 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1492 unsigned short CDP_scratch_idx,
1493 #ifndef DEBUG
1494 FILE UNUSED(*rrd_file),
1495 #else
1496 FILE *rrd_file,
1497 #endif
1498 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1499 #else
1500 info_t
1501 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1502 unsigned short CDP_scratch_idx, FILE *rrd_file,
1503 info_t *pcdp_summary, time_t *rra_time)
1504 #endif
1505 {
1506 unsigned long ds_idx, cdp_idx;
1507 infoval iv;
1509 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1510 {
1511 /* compute the cdp index */
1512 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1513 #ifdef DEBUG
1514 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1515 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1516 rrd -> rra_def[rra_idx].cf_nam);
1517 #endif
1518 if (pcdp_summary != NULL)
1519 {
1520 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1521 /* append info to the return hash */
1522 pcdp_summary = info_push(pcdp_summary,
1523 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1524 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1525 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1526 RD_I_VAL, iv);
1527 }
1528 #ifdef HAVE_MMAP
1529 memcpy((char *)rrd_mmaped_file + *rra_current,
1530 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1531 sizeof(rrd_value_t));
1532 #else
1533 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1534 sizeof(rrd_value_t),1,rrd_file) != 1)
1535 {
1536 rrd_set_error("writing rrd");
1537 return 0;
1538 }
1539 #endif
1540 *rra_current += sizeof(rrd_value_t);
1541 }
1542 return (pcdp_summary);
1543 }