1 /*****************************************************************************
2 * RRDtool 1.2rc2 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #ifdef WIN32
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
27 #ifdef WIN32
28 /*
29 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
30 * replacement.
31 */
32 #include <sys/timeb.h>
34 struct timeval {
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
37 };
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static gettimeofday(struct timeval *t, struct __timezone *tz) {
46 struct timeb current_time;
48 _ftime(¤t_time);
50 t->tv_sec = current_time.time;
51 t->tv_usec = current_time.millitm * 1000;
52 }
54 #endif
55 /*
56 * normilize time as returned by gettimeofday. usec part must
57 * be always >= 0
58 */
59 static void normalize_time(struct timeval *t)
60 {
61 if(t->tv_usec < 0) {
62 t->tv_sec--;
63 t->tv_usec += 1000000L;
64 }
65 }
67 /* Local prototypes */
68 int LockRRD(FILE *rrd_file);
69 #ifdef HAVE_MMAP
70 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
71 unsigned long *rra_current,
72 unsigned short CDP_scratch_idx, FILE *rrd_file,
73 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
74 #else
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx, FILE *rrd_file,
78 info_t *pcdp_summary, time_t *rra_time);
79 #endif
80 int rrd_update_r(char *filename, char *template, int argc, char **argv);
81 int _rrd_update(char *filename, char *template, int argc, char **argv,
82 info_t*);
84 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
87 #ifdef STANDALONE
88 int
89 main(int argc, char **argv){
90 rrd_update(argc,argv);
91 if (rrd_test_error()) {
92 printf("RRDtool 1.2rc2 Copyright by Tobi Oetiker, 1997-2005\n\n"
93 "Usage: rrdupdate filename\n"
94 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
95 "\t\t\ttime|N:value[:value...]\n\n"
96 "\t\t\tat-time@value[:value...]\n\n"
97 "\t\t\t[ time:value[:value...] ..]\n\n");
99 printf("ERROR: %s\n",rrd_get_error());
100 rrd_clear_error();
101 return 1;
102 }
103 return 0;
104 }
105 #endif
107 info_t *rrd_update_v(int argc, char **argv)
108 {
109 char *template = NULL;
110 info_t *result = NULL;
111 infoval rc;
113 while (1) {
114 static struct option long_options[] =
115 {
116 {"template", required_argument, 0, 't'},
117 {0,0,0,0}
118 };
119 int option_index = 0;
120 int opt;
121 opt = getopt_long(argc, argv, "t:",
122 long_options, &option_index);
124 if (opt == EOF)
125 break;
127 switch(opt) {
128 case 't':
129 template = optarg;
130 break;
132 case '?':
133 rrd_set_error("unknown option '%s'",argv[optind-1]);
134 rc.u_int = -1;
135 goto end_tag;
136 }
137 }
139 /* need at least 2 arguments: filename, data. */
140 if (argc-optind < 2) {
141 rrd_set_error("Not enough arguments");
142 rc.u_int = -1;
143 goto end_tag;
144 }
145 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
146 rc.u_int = _rrd_update(argv[optind], template,
147 argc - optind - 1, argv + optind + 1, result);
148 result->value.u_int = rc.u_int;
149 end_tag:
150 return result;
151 }
153 int
154 rrd_update(int argc, char **argv)
155 {
156 char *template = NULL;
157 int rc;
159 while (1) {
160 static struct option long_options[] =
161 {
162 {"template", required_argument, 0, 't'},
163 {0,0,0,0}
164 };
165 int option_index = 0;
166 int opt;
167 opt = getopt_long(argc, argv, "t:",
168 long_options, &option_index);
170 if (opt == EOF)
171 break;
173 switch(opt) {
174 case 't':
175 template = optarg;
176 break;
178 case '?':
179 rrd_set_error("unknown option '%s'",argv[optind-1]);
180 return(-1);
181 }
182 }
184 /* need at least 2 arguments: filename, data. */
185 if (argc-optind < 2) {
186 rrd_set_error("Not enough arguments");
188 return -1;
189 }
191 rc = rrd_update_r(argv[optind], template,
192 argc - optind - 1, argv + optind + 1);
193 return rc;
194 }
196 int
197 rrd_update_r(char *filename, char *template, int argc, char **argv)
198 {
199 return _rrd_update(filename, template, argc, argv, NULL);
200 }
202 int
203 _rrd_update(char *filename, char *template, int argc, char **argv,
204 info_t *pcdp_summary)
205 {
207 int arg_i = 2;
208 short j;
209 unsigned long i,ii,iii=1;
211 unsigned long rra_begin; /* byte pointer to the rra
212 * area in the rrd file. this
213 * pointer never changes value */
214 unsigned long rra_start; /* byte pointer to the rra
215 * area in the rrd file. this
216 * pointer changes as each rrd is
217 * processed. */
218 unsigned long rra_current; /* byte pointer to the current write
219 * spot in the rrd file. */
220 unsigned long rra_pos_tmp; /* temporary byte pointer. */
221 double interval,
222 pre_int,post_int; /* interval between this and
223 * the last run */
224 unsigned long proc_pdp_st; /* which pdp_st was the last
225 * to be processed */
226 unsigned long occu_pdp_st; /* when was the pdp_st
227 * before the last update
228 * time */
229 unsigned long proc_pdp_age; /* how old was the data in
230 * the pdp prep area when it
231 * was last updated */
232 unsigned long occu_pdp_age; /* how long ago was the last
233 * pdp_step time */
234 rrd_value_t *pdp_new; /* prepare the incoming data
235 * to be added the the
236 * existing entry */
237 rrd_value_t *pdp_temp; /* prepare the pdp values
238 * to be added the the
239 * cdp values */
241 long *tmpl_idx; /* index representing the settings
242 transported by the template index */
243 unsigned long tmpl_cnt = 2; /* time and data */
245 FILE *rrd_file;
246 rrd_t rrd;
247 time_t current_time;
248 time_t rra_time; /* time of update for a RRA */
249 unsigned long current_time_usec; /* microseconds part of current time */
250 struct timeval tmp_time; /* used for time conversion */
252 char **updvals;
253 int schedule_smooth = 0;
254 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
255 /* a vector of future Holt-Winters seasonal coefs */
256 unsigned long elapsed_pdp_st;
257 /* number of elapsed PDP steps since last update */
258 unsigned long *rra_step_cnt = NULL;
259 /* number of rows to be updated in an RRA for a data
260 * value. */
261 unsigned long start_pdp_offset;
262 /* number of PDP steps since the last update that
263 * are assigned to the first CDP to be generated
264 * since the last update. */
265 unsigned short scratch_idx;
266 /* index into the CDP scratch array */
267 enum cf_en current_cf;
268 /* numeric id of the current consolidation function */
269 rpnstack_t rpnstack; /* used for COMPUTE DS */
270 int version; /* rrd version */
271 char *endptr; /* used in the conversion */
272 #ifdef HAVE_MMAP
273 void *rrd_mmaped_file;
274 unsigned long rrd_filesize;
275 #endif
277 rpnstack_init(&rpnstack);
279 /* need at least 1 arguments: data. */
280 if (argc < 1) {
281 rrd_set_error("Not enough arguments");
282 return -1;
283 }
287 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
288 return -1;
289 }
290 /* initialize time */
291 version = atoi(rrd.stat_head->version);
292 gettimeofday(&tmp_time, 0);
293 normalize_time(&tmp_time);
294 current_time = tmp_time.tv_sec;
295 if(version >= 3) {
296 current_time_usec = tmp_time.tv_usec;
297 }
298 else {
299 current_time_usec = 0;
300 }
302 rra_current = rra_start = rra_begin = ftell(rrd_file);
303 /* This is defined in the ANSI C standard, section 7.9.5.3:
305 When a file is opened with udpate mode ('+' as the second
306 or third character in the ... list of mode argument
307 variables), both input and ouptut may be performed on the
308 associated stream. However, ... input may not be directly
309 followed by output without an intervening call to a file
310 positioning function, unless the input oepration encounters
311 end-of-file. */
312 #ifdef HAVE_MMAP
313 fseek(rrd_file, 0, SEEK_END);
314 rrd_filesize = ftell(rrd_file);
315 fseek(rrd_file, rra_current, SEEK_SET);
316 #else
317 fseek(rrd_file, 0, SEEK_CUR);
318 #endif
321 /* get exclusive lock to whole file.
322 * lock gets removed when we close the file.
323 */
324 if (LockRRD(rrd_file) != 0) {
325 rrd_set_error("could not lock RRD");
326 rrd_free(&rrd);
327 fclose(rrd_file);
328 return(-1);
329 }
331 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
332 rrd_set_error("allocating updvals pointer array");
333 rrd_free(&rrd);
334 fclose(rrd_file);
335 return(-1);
336 }
338 if ((pdp_temp = malloc(sizeof(rrd_value_t)
339 *rrd.stat_head->ds_cnt))==NULL){
340 rrd_set_error("allocating pdp_temp ...");
341 free(updvals);
342 rrd_free(&rrd);
343 fclose(rrd_file);
344 return(-1);
345 }
347 if ((tmpl_idx = malloc(sizeof(unsigned long)
348 *(rrd.stat_head->ds_cnt+1)))==NULL){
349 rrd_set_error("allocating tmpl_idx ...");
350 free(pdp_temp);
351 free(updvals);
352 rrd_free(&rrd);
353 fclose(rrd_file);
354 return(-1);
355 }
356 /* initialize template redirector */
357 /* default config example (assume DS 1 is a CDEF DS)
358 tmpl_idx[0] -> 0; (time)
359 tmpl_idx[1] -> 1; (DS 0)
360 tmpl_idx[2] -> 3; (DS 2)
361 tmpl_idx[3] -> 4; (DS 3) */
362 tmpl_idx[0] = 0; /* time */
363 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
364 {
365 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
366 tmpl_idx[ii++]=i;
367 }
368 tmpl_cnt= ii;
370 if (template) {
371 char *dsname;
372 unsigned int tmpl_len;
373 dsname = template;
374 tmpl_cnt = 1; /* the first entry is the time */
375 tmpl_len = strlen(template);
376 for(i=0;i<=tmpl_len ;i++) {
377 if (template[i] == ':' || template[i] == '\0') {
378 template[i] = '\0';
379 if (tmpl_cnt>rrd.stat_head->ds_cnt){
380 rrd_set_error("Template contains more DS definitions than RRD");
381 free(updvals); free(pdp_temp);
382 free(tmpl_idx); rrd_free(&rrd);
383 fclose(rrd_file); return(-1);
384 }
385 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
386 rrd_set_error("unknown DS name '%s'",dsname);
387 free(updvals); free(pdp_temp);
388 free(tmpl_idx); rrd_free(&rrd);
389 fclose(rrd_file); return(-1);
390 } else {
391 /* the first element is always the time */
392 tmpl_idx[tmpl_cnt-1]++;
393 /* go to the next entry on the template */
394 dsname = &template[i+1];
395 /* fix the damage we did before */
396 if (i<tmpl_len) {
397 template[i]=':';
398 }
400 }
401 }
402 }
403 }
404 if ((pdp_new = malloc(sizeof(rrd_value_t)
405 *rrd.stat_head->ds_cnt))==NULL){
406 rrd_set_error("allocating pdp_new ...");
407 free(updvals);
408 free(pdp_temp);
409 free(tmpl_idx);
410 rrd_free(&rrd);
411 fclose(rrd_file);
412 return(-1);
413 }
415 #ifdef HAVE_MMAP
416 rrd_mmaped_file = mmap(0,
417 rrd_filesize,
418 PROT_READ | PROT_WRITE,
419 MAP_SHARED,
420 fileno(rrd_file),
421 0);
422 if (rrd_mmaped_file == MAP_FAILED) {
423 rrd_set_error("error mmapping file %s", filename);
424 free(updvals);
425 free(pdp_temp);
426 free(tmpl_idx);
427 rrd_free(&rrd);
428 fclose(rrd_file);
429 return(-1);
430 }
431 #endif
432 /* loop through the arguments. */
433 for(arg_i=0; arg_i<argc;arg_i++) {
434 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
435 char *step_start = stepper;
436 char *p;
437 char *parsetime_error = NULL;
438 enum {atstyle, normal} timesyntax;
439 struct rrd_time_value ds_tv;
440 if (stepper == NULL){
441 rrd_set_error("failed duplication argv entry");
442 free(updvals);
443 free(pdp_temp);
444 free(tmpl_idx);
445 rrd_free(&rrd);
446 #ifdef HAVE_MMAP
447 munmap(rrd_mmaped_file, rrd_filesize);
448 #endif
449 fclose(rrd_file);
450 return(-1);
451 }
452 /* initialize all ds input to unknown except the first one
453 which has always got to be set */
454 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
455 strcpy(stepper,argv[arg_i]);
456 updvals[0]=stepper;
457 /* separate all ds elements; first must be examined separately
458 due to alternate time syntax */
459 if ((p=strchr(stepper,'@'))!=NULL) {
460 timesyntax = atstyle;
461 *p = '\0';
462 stepper = p+1;
463 } else if ((p=strchr(stepper,':'))!=NULL) {
464 timesyntax = normal;
465 *p = '\0';
466 stepper = p+1;
467 } else {
468 rrd_set_error("expected timestamp not found in data source from %s:...",
469 argv[arg_i]);
470 free(step_start);
471 break;
472 }
473 ii=1;
474 updvals[tmpl_idx[ii]] = stepper;
475 while (*stepper) {
476 if (*stepper == ':') {
477 *stepper = '\0';
478 ii++;
479 if (ii<tmpl_cnt){
480 updvals[tmpl_idx[ii]] = stepper+1;
481 }
482 }
483 stepper++;
484 }
486 if (ii != tmpl_cnt-1) {
487 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
488 tmpl_cnt-1, ii, argv[arg_i]);
489 free(step_start);
490 break;
491 }
493 /* get the time from the reading ... handle N */
494 if (timesyntax == atstyle) {
495 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
496 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
497 free(step_start);
498 break;
499 }
500 if (ds_tv.type == RELATIVE_TO_END_TIME ||
501 ds_tv.type == RELATIVE_TO_START_TIME) {
502 rrd_set_error("specifying time relative to the 'start' "
503 "or 'end' makes no sense here: %s",
504 updvals[0]);
505 free(step_start);
506 break;
507 }
509 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
510 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
512 } else if (strcmp(updvals[0],"N")==0){
513 gettimeofday(&tmp_time, 0);
514 normalize_time(&tmp_time);
515 current_time = tmp_time.tv_sec;
516 current_time_usec = tmp_time.tv_usec;
517 } else {
518 double tmp;
519 tmp = strtod(updvals[0], 0);
520 current_time = floor(tmp);
521 current_time_usec = (long)((tmp - current_time) * 1000000L);
522 }
523 /* dont do any correction for old version RRDs */
524 if(version < 3)
525 current_time_usec = 0;
527 if(current_time <= rrd.live_head->last_up){
528 rrd_set_error("illegal attempt to update using time %ld when "
529 "last update time is %ld (minimum one second step)",
530 current_time, rrd.live_head->last_up);
531 free(step_start);
532 break;
533 }
536 /* seek to the beginning of the rra's */
537 if (rra_current != rra_begin) {
538 #ifndef HAVE_MMAP
539 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
540 rrd_set_error("seek error in rrd");
541 free(step_start);
542 break;
543 }
544 #endif
545 rra_current = rra_begin;
546 }
547 rra_start = rra_begin;
549 /* when was the current pdp started */
550 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
551 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
553 /* when did the last pdp_st occur */
554 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
555 occu_pdp_st = current_time - occu_pdp_age;
556 /* interval = current_time - rrd.live_head->last_up; */
557 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
559 if (occu_pdp_st > proc_pdp_st){
560 /* OK we passed the pdp_st moment*/
561 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
562 * occurred before the latest
563 * pdp_st moment*/
564 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
565 post_int = occu_pdp_age; /* how much after it */
566 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
567 } else {
568 pre_int = interval;
569 post_int = 0;
570 }
572 #ifdef DEBUG
573 printf(
574 "proc_pdp_age %lu\t"
575 "proc_pdp_st %lu\t"
576 "occu_pfp_age %lu\t"
577 "occu_pdp_st %lu\t"
578 "int %lf\t"
579 "pre_int %lf\t"
580 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
581 occu_pdp_age, occu_pdp_st,
582 interval, pre_int, post_int);
583 #endif
585 /* process the data sources and update the pdp_prep
586 * area accordingly */
587 for(i=0;i<rrd.stat_head->ds_cnt;i++){
588 enum dst_en dst_idx;
589 dst_idx= dst_conv(rrd.ds_def[i].dst);
590 /* NOTE: DST_CDEF should never enter this if block, because
591 * updvals[i+1][0] is initialized to 'U'; unless the caller
592 * accidently specified a value for the DST_CDEF. To handle
593 * this case, an extra check is required. */
594 if((updvals[i+1][0] != 'U') &&
595 (dst_idx != DST_CDEF) &&
596 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
597 double rate = DNAN;
598 /* the data source type defines how to process the data */
599 /* pdp_new contains rate * time ... eg the bytes
600 * transferred during the interval. Doing it this way saves
601 * a lot of math operations */
604 switch(dst_idx){
605 case DST_COUNTER:
606 case DST_DERIVE:
607 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
608 for(ii=0;updvals[i+1][ii] != '\0';ii++){
609 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
610 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
611 break;
612 }
613 }
614 if (rrd_test_error()){
615 break;
616 }
617 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
618 if(dst_idx == DST_COUNTER) {
619 /* simple overflow catcher suggested by Andres Kroonmaa */
620 /* this will fail terribly for non 32 or 64 bit counters ... */
621 /* are there any others in SNMP land ? */
622 if (pdp_new[i] < (double)0.0 )
623 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
624 if (pdp_new[i] < (double)0.0 )
625 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
626 }
627 rate = pdp_new[i] / interval;
628 }
629 else {
630 pdp_new[i]= DNAN;
631 }
632 break;
633 case DST_ABSOLUTE:
634 errno = 0;
635 pdp_new[i] = strtod(updvals[i+1],&endptr);
636 if (errno > 0){
637 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
638 break;
639 };
640 if (endptr[0] != '\0'){
641 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
642 break;
643 }
644 rate = pdp_new[i] / interval;
645 break;
646 case DST_GAUGE:
647 errno = 0;
648 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
649 if (errno > 0){
650 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
651 break;
652 };
653 if (endptr[0] != '\0'){
654 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
655 break;
656 }
657 rate = pdp_new[i] / interval;
658 break;
659 default:
660 rrd_set_error("rrd contains unknown DS type : '%s'",
661 rrd.ds_def[i].dst);
662 break;
663 }
664 /* break out of this for loop if the error string is set */
665 if (rrd_test_error()){
666 break;
667 }
668 /* make sure pdp_temp is neither too large or too small
669 * if any of these occur it becomes unknown ...
670 * sorry folks ... */
671 if ( ! isnan(rate) &&
672 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
673 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
674 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
675 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
676 pdp_new[i] = DNAN;
677 }
678 } else {
679 /* no news is news all the same */
680 pdp_new[i] = DNAN;
681 }
683 /* make a copy of the command line argument for the next run */
684 #ifdef DEBUG
685 fprintf(stderr,
686 "prep ds[%lu]\t"
687 "last_arg '%s'\t"
688 "this_arg '%s'\t"
689 "pdp_new %10.2f\n",
690 i,
691 rrd.pdp_prep[i].last_ds,
692 updvals[i+1], pdp_new[i]);
693 #endif
694 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
695 strncpy(rrd.pdp_prep[i].last_ds,
696 updvals[i+1],LAST_DS_LEN-1);
697 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
698 }
699 }
700 /* break out of the argument parsing loop if the error_string is set */
701 if (rrd_test_error()){
702 free(step_start);
703 break;
704 }
705 /* has a pdp_st moment occurred since the last run ? */
707 if (proc_pdp_st == occu_pdp_st){
708 /* no we have not passed a pdp_st moment. therefore update is simple */
710 for(i=0;i<rrd.stat_head->ds_cnt;i++){
711 if(isnan(pdp_new[i]))
712 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
713 else
714 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
715 #ifdef DEBUG
716 fprintf(stderr,
717 "NO PDP ds[%lu]\t"
718 "value %10.2f\t"
719 "unkn_sec %5lu\n",
720 i,
721 rrd.pdp_prep[i].scratch[PDP_val].u_val,
722 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
723 #endif
724 }
725 } else {
726 /* an pdp_st has occurred. */
728 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
729 * occurred up to the last run.
730 pdp_new[] contains rate*seconds from the latest run.
731 pdp_temp[] will contain the rate for cdp */
733 for(i=0;i<rrd.stat_head->ds_cnt;i++){
734 /* update pdp_prep to the current pdp_st */
735 if(isnan(pdp_new[i]))
736 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
737 else
738 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
739 pdp_new[i]/(double)interval*(double)pre_int;
741 /* if too much of the pdp_prep is unknown we dump it */
742 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
743 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
744 (occu_pdp_st-proc_pdp_st <=
745 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
746 pdp_temp[i] = DNAN;
747 } else {
748 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
749 / (double)( occu_pdp_st
750 - proc_pdp_st
751 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
752 }
754 /* process CDEF data sources; remember each CDEF DS can
755 * only reference other DS with a lower index number */
756 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
757 rpnp_t *rpnp;
758 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
759 /* substitue data values for OP_VARIABLE nodes */
760 for (ii = 0; rpnp[ii].op != OP_END; ii++)
761 {
762 if (rpnp[ii].op == OP_VARIABLE) {
763 rpnp[ii].op = OP_NUMBER;
764 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
765 }
766 }
767 /* run the rpn calculator */
768 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
769 free(rpnp);
770 break; /* exits the data sources pdp_temp loop */
771 }
772 }
774 /* make pdp_prep ready for the next run */
775 if(isnan(pdp_new[i])){
776 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
777 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
778 } else {
779 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
780 rrd.pdp_prep[i].scratch[PDP_val].u_val =
781 pdp_new[i]/(double)interval*(double)post_int;
782 }
784 #ifdef DEBUG
785 fprintf(stderr,
786 "PDP UPD ds[%lu]\t"
787 "pdp_temp %10.2f\t"
788 "new_prep %10.2f\t"
789 "new_unkn_sec %5lu\n",
790 i, pdp_temp[i],
791 rrd.pdp_prep[i].scratch[PDP_val].u_val,
792 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
793 #endif
794 }
796 /* if there were errors during the last loop, bail out here */
797 if (rrd_test_error()){
798 free(step_start);
799 break;
800 }
802 /* compute the number of elapsed pdp_st moments */
803 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
804 #ifdef DEBUG
805 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
806 #endif
807 if (rra_step_cnt == NULL)
808 {
809 rra_step_cnt = (unsigned long *)
810 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
811 }
813 for(i = 0, rra_start = rra_begin;
814 i < rrd.stat_head->rra_cnt;
815 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
816 i++)
817 {
818 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
819 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
820 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
821 if (start_pdp_offset <= elapsed_pdp_st) {
822 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
823 rrd.rra_def[i].pdp_cnt + 1;
824 } else {
825 rra_step_cnt[i] = 0;
826 }
828 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
829 {
830 /* If this is a bulk update, we need to skip ahead in the seasonal
831 * arrays so that they will be correct for the next observed value;
832 * note that for the bulk update itself, no update will occur to
833 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
834 * be set to DNAN. */
835 if (rra_step_cnt[i] > 2)
836 {
837 /* skip update by resetting rra_step_cnt[i],
838 * note that this is not data source specific; this is due
839 * to the bulk update, not a DNAN value for the specific data
840 * source. */
841 rra_step_cnt[i] = 0;
842 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
843 &last_seasonal_coef);
844 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
845 &seasonal_coef);
846 }
848 /* periodically run a smoother for seasonal effects */
849 /* Need to use first cdp parameter buffer to track
850 * burnin (burnin requires a specific smoothing schedule).
851 * The CDP_init_seasonal parameter is really an RRA level,
852 * not a data source within RRA level parameter, but the rra_def
853 * is read only for rrd_update (not flushed to disk). */
854 iii = i*(rrd.stat_head -> ds_cnt);
855 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
856 <= BURNIN_CYCLES)
857 {
858 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
859 > rrd.rra_def[i].row_cnt - 1) {
860 /* mark off one of the burnin cycles */
861 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
862 schedule_smooth = 1;
863 }
864 } else {
865 /* someone has no doubt invented a trick to deal with this
866 * wrap around, but at least this code is clear. */
867 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
868 rrd.rra_ptr[i].cur_row)
869 {
870 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
871 * mapping between PDP and CDP */
872 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
873 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
874 {
875 #ifdef DEBUG
876 fprintf(stderr,
877 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
878 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
879 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
880 #endif
881 schedule_smooth = 1;
882 }
883 } else {
884 /* can't rely on negative numbers because we are working with
885 * unsigned values */
886 /* Don't need modulus here. If we've wrapped more than once, only
887 * one smooth is executed at the end. */
888 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
889 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
890 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
891 {
892 #ifdef DEBUG
893 fprintf(stderr,
894 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
895 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
896 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
897 #endif
898 schedule_smooth = 1;
899 }
900 }
901 }
903 rra_current = ftell(rrd_file);
904 } /* if cf is DEVSEASONAL or SEASONAL */
906 if (rrd_test_error()) break;
908 /* update CDP_PREP areas */
909 /* loop over data soures within each RRA */
910 for(ii = 0;
911 ii < rrd.stat_head->ds_cnt;
912 ii++)
913 {
915 /* iii indexes the CDP prep area for this data source within the RRA */
916 iii=i*rrd.stat_head->ds_cnt+ii;
918 if (rrd.rra_def[i].pdp_cnt > 1) {
920 if (rra_step_cnt[i] > 0) {
921 /* If we are in this block, as least 1 CDP value will be written to
922 * disk, this is the CDP_primary_val entry. If more than 1 value needs
923 * to be written, then the "fill in" value is the CDP_secondary_val
924 * entry. */
925 if (isnan(pdp_temp[ii]))
926 {
927 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
928 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
929 } else {
930 /* CDP_secondary value is the RRA "fill in" value for intermediary
931 * CDP data entries. No matter the CF, the value is the same because
932 * the average, max, min, and last of a list of identical values is
933 * the same, namely, the value itself. */
934 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
935 }
937 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
938 > rrd.rra_def[i].pdp_cnt*
939 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
940 {
941 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
942 /* initialize carry over */
943 if (current_cf == CF_AVERAGE) {
944 if (isnan(pdp_temp[ii])) {
945 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
946 } else {
947 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
948 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
949 }
950 } else {
951 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
952 }
953 } else {
954 rrd_value_t cum_val, cur_val;
955 switch (current_cf) {
956 case CF_AVERAGE:
957 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
958 cur_val = IFDNAN(pdp_temp[ii],0.0);
959 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
960 (cum_val + cur_val * start_pdp_offset) /
961 (rrd.rra_def[i].pdp_cnt
962 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
963 /* initialize carry over value */
964 if (isnan(pdp_temp[ii])) {
965 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
966 } else {
967 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
968 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
969 }
970 break;
971 case CF_MAXIMUM:
972 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
973 cur_val = IFDNAN(pdp_temp[ii],-DINF);
974 #ifdef DEBUG
975 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
976 isnan(pdp_temp[ii])) {
977 fprintf(stderr,
978 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
979 i,ii);
980 exit(-1);
981 }
982 #endif
983 if (cur_val > cum_val)
984 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
985 else
986 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
987 /* initialize carry over value */
988 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
989 break;
990 case CF_MINIMUM:
991 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
992 cur_val = IFDNAN(pdp_temp[ii],DINF);
993 #ifdef DEBUG
994 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
995 isnan(pdp_temp[ii])) {
996 fprintf(stderr,
997 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
998 i,ii);
999 exit(-1);
1000 }
1001 #endif
1002 if (cur_val < cum_val)
1003 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1004 else
1005 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1006 /* initialize carry over value */
1007 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1008 break;
1009 case CF_LAST:
1010 default:
1011 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1012 /* initialize carry over value */
1013 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1014 break;
1015 }
1016 } /* endif meets xff value requirement for a valid value */
1017 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1018 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1019 if (isnan(pdp_temp[ii]))
1020 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1021 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1022 else
1023 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1024 } else /* rra_step_cnt[i] == 0 */
1025 {
1026 #ifdef DEBUG
1027 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1028 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1029 i,ii);
1030 } else {
1031 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1032 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1033 }
1034 #endif
1035 if (isnan(pdp_temp[ii])) {
1036 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1037 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1038 {
1039 if (current_cf == CF_AVERAGE) {
1040 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1041 elapsed_pdp_st;
1042 } else {
1043 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1044 }
1045 #ifdef DEBUG
1046 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1047 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1048 #endif
1049 } else {
1050 switch (current_cf) {
1051 case CF_AVERAGE:
1052 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1053 elapsed_pdp_st;
1054 break;
1055 case CF_MINIMUM:
1056 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1057 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1058 break;
1059 case CF_MAXIMUM:
1060 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1061 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062 break;
1063 case CF_LAST:
1064 default:
1065 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1066 break;
1067 }
1068 }
1069 }
1070 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1071 if (elapsed_pdp_st > 2)
1072 {
1073 switch (current_cf) {
1074 case CF_AVERAGE:
1075 default:
1076 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1077 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1078 break;
1079 case CF_SEASONAL:
1080 case CF_DEVSEASONAL:
1081 /* need to update cached seasonal values, so they are consistent
1082 * with the bulk update */
1083 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1084 * CDP_last_deviation are the same. */
1085 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1086 last_seasonal_coef[ii];
1087 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1088 seasonal_coef[ii];
1089 break;
1090 case CF_HWPREDICT:
1091 /* need to update the null_count and last_null_count.
1092 * even do this for non-DNAN pdp_temp because the
1093 * algorithm is not learning from batch updates. */
1094 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1095 elapsed_pdp_st;
1096 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1097 elapsed_pdp_st - 1;
1098 /* fall through */
1099 case CF_DEVPREDICT:
1100 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1101 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1102 break;
1103 case CF_FAILURES:
1104 /* do not count missed bulk values as failures */
1105 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1106 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1107 /* need to reset violations buffer.
1108 * could do this more carefully, but for now, just
1109 * assume a bulk update wipes away all violations. */
1110 erase_violations(&rrd, iii, i);
1111 break;
1112 }
1113 }
1114 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1116 if (rrd_test_error()) break;
1118 } /* endif data sources loop */
1119 } /* end RRA Loop */
1121 /* this loop is only entered if elapsed_pdp_st < 3 */
1122 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1123 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1124 {
1125 for(i = 0, rra_start = rra_begin;
1126 i < rrd.stat_head->rra_cnt;
1127 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1128 i++)
1129 {
1130 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1132 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1133 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1134 {
1135 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1136 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1137 &seasonal_coef);
1138 rra_current = ftell(rrd_file);
1139 }
1140 if (rrd_test_error()) break;
1141 /* loop over data soures within each RRA */
1142 for(ii = 0;
1143 ii < rrd.stat_head->ds_cnt;
1144 ii++)
1145 {
1146 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1147 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1148 scratch_idx, seasonal_coef);
1149 }
1150 } /* end RRA Loop */
1151 if (rrd_test_error()) break;
1152 } /* end elapsed_pdp_st loop */
1154 if (rrd_test_error()) break;
1156 /* Ready to write to disk */
1157 /* Move sequentially through the file, writing one RRA at a time.
1158 * Note this architecture divorces the computation of CDP with
1159 * flushing updated RRA entries to disk. */
1160 for(i = 0, rra_start = rra_begin;
1161 i < rrd.stat_head->rra_cnt;
1162 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1163 i++) {
1164 /* is there anything to write for this RRA? If not, continue. */
1165 if (rra_step_cnt[i] == 0) continue;
1167 /* write the first row */
1168 #ifdef DEBUG
1169 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1170 #endif
1171 rrd.rra_ptr[i].cur_row++;
1172 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1173 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1174 /* positition on the first row */
1175 rra_pos_tmp = rra_start +
1176 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1177 if(rra_pos_tmp != rra_current) {
1178 #ifndef HAVE_MMAP
1179 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1180 rrd_set_error("seek error in rrd");
1181 break;
1182 }
1183 #endif
1184 rra_current = rra_pos_tmp;
1185 }
1187 #ifdef DEBUG
1188 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1189 #endif
1190 scratch_idx = CDP_primary_val;
1191 if (pcdp_summary != NULL)
1192 {
1193 rra_time = (current_time - current_time
1194 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1195 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1196 }
1197 #ifdef HAVE_MMAP
1198 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1199 pcdp_summary, &rra_time, rrd_mmaped_file);
1200 #else
1201 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1202 pcdp_summary, &rra_time);
1203 #endif
1204 if (rrd_test_error()) break;
1206 /* write other rows of the bulk update, if any */
1207 scratch_idx = CDP_secondary_val;
1208 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1209 {
1210 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1211 {
1212 #ifdef DEBUG
1213 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1214 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1215 #endif
1216 /* wrap */
1217 rrd.rra_ptr[i].cur_row = 0;
1218 /* seek back to beginning of current rra */
1219 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1220 {
1221 rrd_set_error("seek error in rrd");
1222 break;
1223 }
1224 #ifdef DEBUG
1225 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1226 #endif
1227 rra_current = rra_start;
1228 }
1229 if (pcdp_summary != NULL)
1230 {
1231 rra_time = (current_time - current_time
1232 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1233 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1234 }
1235 #ifdef HAVE_MMAP
1236 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1237 pcdp_summary, &rra_time, rrd_mmaped_file);
1238 #else
1239 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1240 pcdp_summary, &rra_time);
1241 #endif
1242 }
1244 if (rrd_test_error())
1245 break;
1246 } /* RRA LOOP */
1248 /* break out of the argument parsing loop if error_string is set */
1249 if (rrd_test_error()){
1250 free(step_start);
1251 break;
1252 }
1254 } /* endif a pdp_st has occurred */
1255 rrd.live_head->last_up = current_time;
1256 rrd.live_head->last_up_usec = current_time_usec;
1257 free(step_start);
1258 } /* function argument loop */
1260 if (seasonal_coef != NULL) free(seasonal_coef);
1261 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1262 if (rra_step_cnt != NULL) free(rra_step_cnt);
1263 rpnstack_free(&rpnstack);
1265 #ifdef HAVE_MMAP
1266 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1267 rrd_set_error("error writing(unmapping) file: %s", filename);
1268 }
1269 #endif
1270 /* if we got here and if there is an error and if the file has not been
1271 * written to, then close things up and return. */
1272 if (rrd_test_error()) {
1273 free(updvals);
1274 free(tmpl_idx);
1275 rrd_free(&rrd);
1276 free(pdp_temp);
1277 free(pdp_new);
1278 fclose(rrd_file);
1279 return(-1);
1280 }
1282 /* aargh ... that was tough ... so many loops ... anyway, its done.
1283 * we just need to write back the live header portion now*/
1285 if (fseek(rrd_file, (sizeof(stat_head_t)
1286 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1287 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1288 SEEK_SET) != 0) {
1289 rrd_set_error("seek rrd for live header writeback");
1290 free(updvals);
1291 free(tmpl_idx);
1292 rrd_free(&rrd);
1293 free(pdp_temp);
1294 free(pdp_new);
1295 fclose(rrd_file);
1296 return(-1);
1297 }
1299 if(version >= 3) {
1300 if(fwrite( rrd.live_head,
1301 sizeof(live_head_t), 1, rrd_file) != 1){
1302 rrd_set_error("fwrite live_head to rrd");
1303 free(updvals);
1304 rrd_free(&rrd);
1305 free(tmpl_idx);
1306 free(pdp_temp);
1307 free(pdp_new);
1308 fclose(rrd_file);
1309 return(-1);
1310 }
1311 }
1312 else {
1313 if(fwrite( &rrd.live_head->last_up,
1314 sizeof(time_t), 1, rrd_file) != 1){
1315 rrd_set_error("fwrite live_head to rrd");
1316 free(updvals);
1317 rrd_free(&rrd);
1318 free(tmpl_idx);
1319 free(pdp_temp);
1320 free(pdp_new);
1321 fclose(rrd_file);
1322 return(-1);
1323 }
1324 }
1327 if(fwrite( rrd.pdp_prep,
1328 sizeof(pdp_prep_t),
1329 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1330 rrd_set_error("ftwrite pdp_prep to rrd");
1331 free(updvals);
1332 rrd_free(&rrd);
1333 free(tmpl_idx);
1334 free(pdp_temp);
1335 free(pdp_new);
1336 fclose(rrd_file);
1337 return(-1);
1338 }
1340 if(fwrite( rrd.cdp_prep,
1341 sizeof(cdp_prep_t),
1342 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1343 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1345 rrd_set_error("ftwrite cdp_prep to rrd");
1346 free(updvals);
1347 free(tmpl_idx);
1348 rrd_free(&rrd);
1349 free(pdp_temp);
1350 free(pdp_new);
1351 fclose(rrd_file);
1352 return(-1);
1353 }
1355 if(fwrite( rrd.rra_ptr,
1356 sizeof(rra_ptr_t),
1357 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1358 rrd_set_error("fwrite rra_ptr to rrd");
1359 free(updvals);
1360 free(tmpl_idx);
1361 rrd_free(&rrd);
1362 free(pdp_temp);
1363 free(pdp_new);
1364 fclose(rrd_file);
1365 return(-1);
1366 }
1368 /* OK now close the files and free the memory */
1369 if(fclose(rrd_file) != 0){
1370 rrd_set_error("closing rrd");
1371 free(updvals);
1372 free(tmpl_idx);
1373 rrd_free(&rrd);
1374 free(pdp_temp);
1375 free(pdp_new);
1376 return(-1);
1377 }
1379 /* calling the smoothing code here guarantees at most
1380 * one smoothing operation per rrd_update call. Unfortunately,
1381 * it is possible with bulk updates, or a long-delayed update
1382 * for smoothing to occur off-schedule. This really isn't
1383 * critical except during the burning cycles. */
1384 if (schedule_smooth)
1385 {
1386 #ifndef WIN32
1387 rrd_file = fopen(filename,"r+");
1388 #else
1389 rrd_file = fopen(filename,"rb+");
1390 #endif
1391 rra_start = rra_begin;
1392 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1393 {
1394 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1395 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1396 {
1397 #ifdef DEBUG
1398 fprintf(stderr,"Running smoother for rra %ld\n",i);
1399 #endif
1400 apply_smoother(&rrd,i,rra_start,rrd_file);
1401 if (rrd_test_error())
1402 break;
1403 }
1404 rra_start += rrd.rra_def[i].row_cnt
1405 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1406 }
1407 fclose(rrd_file);
1408 }
1409 rrd_free(&rrd);
1410 free(updvals);
1411 free(tmpl_idx);
1412 free(pdp_new);
1413 free(pdp_temp);
1414 return(0);
1415 }
1417 /*
1418 * get exclusive lock to whole file.
1419 * lock gets removed when we close the file
1420 *
1421 * returns 0 on success
1422 */
1423 int
1424 LockRRD(FILE *rrdfile)
1425 {
1426 int rrd_fd; /* File descriptor for RRD */
1427 int rcstat;
1429 rrd_fd = fileno(rrdfile);
1431 {
1432 #ifndef WIN32
1433 struct flock lock;
1434 lock.l_type = F_WRLCK; /* exclusive write lock */
1435 lock.l_len = 0; /* whole file */
1436 lock.l_start = 0; /* start of file */
1437 lock.l_whence = SEEK_SET; /* end of file */
1439 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1440 #else
1441 struct _stat st;
1443 if ( _fstat( rrd_fd, &st ) == 0 ) {
1444 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1445 } else {
1446 rcstat = -1;
1447 }
1448 #endif
1449 }
1451 return(rcstat);
1452 }
1455 #ifdef HAVE_MMAP
1456 info_t
1457 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1458 unsigned short CDP_scratch_idx, FILE *rrd_file,
1459 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1460 #else
1461 info_t
1462 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1463 unsigned short CDP_scratch_idx, FILE *rrd_file,
1464 info_t *pcdp_summary, time_t *rra_time)
1465 #endif
1466 {
1467 unsigned long ds_idx, cdp_idx;
1468 infoval iv;
1470 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1471 {
1472 /* compute the cdp index */
1473 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1474 #ifdef DEBUG
1475 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1476 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1477 rrd -> rra_def[rra_idx].cf_nam);
1478 #endif
1479 if (pcdp_summary != NULL)
1480 {
1481 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1482 /* append info to the return hash */
1483 pcdp_summary = info_push(pcdp_summary,
1484 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1485 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1486 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1487 RD_I_VAL, iv);
1488 }
1489 #ifdef HAVE_MMAP
1490 memcpy((char *)rrd_mmaped_file + *rra_current,
1491 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1492 sizeof(rrd_value_t));
1493 #else
1494 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1495 sizeof(rrd_value_t),1,rrd_file) != 1)
1496 {
1497 rrd_set_error("writing rrd");
1498 return 0;
1499 }
1500 #endif
1501 *rra_current += sizeof(rrd_value_t);
1502 }
1503 return (pcdp_summary);
1504 }