95e927163e280aa23b3ea8b899354537bbb8adef
1 /*****************************************************************************
2 * RRDtool 1.2.9 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 struct timeval {
36 time_t tv_sec; /* seconds */
37 long tv_usec; /* microseconds */
38 };
40 struct __timezone {
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
43 };
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
47 struct timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
53 }
55 #endif
56 /*
57 * normilize time as returned by gettimeofday. usec part must
58 * be always >= 0
59 */
60 static void normalize_time(struct timeval *t)
61 {
62 if(t->tv_usec < 0) {
63 t->tv_sec--;
64 t->tv_usec += 1000000L;
65 }
66 }
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
70 #ifdef HAVE_MMAP
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
72 unsigned long *rra_current,
73 unsigned short CDP_scratch_idx,
74 #ifndef DEBUG
75 FILE UNUSED(*rrd_file),
76 #else
77 FILE *rrd_file,
78 #endif
79 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
80 #else
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
82 unsigned long *rra_current,
83 unsigned short CDP_scratch_idx, FILE *rrd_file,
84 info_t *pcdp_summary, time_t *rra_time);
85 #endif
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv,
88 info_t*);
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
93 #ifdef STANDALONE
94 int
95 main(int argc, char **argv){
96 rrd_update(argc,argv);
97 if (rrd_test_error()) {
98 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
99 "Usage: rrdupdate filename\n"
100 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101 "\t\t\ttime|N:value[:value...]\n\n"
102 "\t\t\tat-time@value[:value...]\n\n"
103 "\t\t\t[ time:value[:value...] ..]\n\n");
105 printf("ERROR: %s\n",rrd_get_error());
106 rrd_clear_error();
107 return 1;
108 }
109 return 0;
110 }
111 #endif
113 info_t *rrd_update_v(int argc, char **argv)
114 {
115 char *template = NULL;
116 info_t *result = NULL;
117 infoval rc;
118 optind = 0; opterr = 0; /* initialize getopt */
120 while (1) {
121 static struct option long_options[] =
122 {
123 {"template", required_argument, 0, 't'},
124 {0,0,0,0}
125 };
126 int option_index = 0;
127 int opt;
128 opt = getopt_long(argc, argv, "t:",
129 long_options, &option_index);
131 if (opt == EOF)
132 break;
134 switch(opt) {
135 case 't':
136 template = optarg;
137 break;
139 case '?':
140 rrd_set_error("unknown option '%s'",argv[optind-1]);
141 rc.u_int = -1;
142 goto end_tag;
143 }
144 }
146 /* need at least 2 arguments: filename, data. */
147 if (argc-optind < 2) {
148 rrd_set_error("Not enough arguments");
149 rc.u_int = -1;
150 goto end_tag;
151 }
152 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153 rc.u_int = _rrd_update(argv[optind], template,
154 argc - optind - 1, argv + optind + 1, result);
155 result->value.u_int = rc.u_int;
156 end_tag:
157 return result;
158 }
160 int
161 rrd_update(int argc, char **argv)
162 {
163 char *template = NULL;
164 int rc;
165 optind = 0; opterr = 0; /* initialize getopt */
167 while (1) {
168 static struct option long_options[] =
169 {
170 {"template", required_argument, 0, 't'},
171 {0,0,0,0}
172 };
173 int option_index = 0;
174 int opt;
175 opt = getopt_long(argc, argv, "t:",
176 long_options, &option_index);
178 if (opt == EOF)
179 break;
181 switch(opt) {
182 case 't':
183 template = optarg;
184 break;
186 case '?':
187 rrd_set_error("unknown option '%s'",argv[optind-1]);
188 return(-1);
189 }
190 }
192 /* need at least 2 arguments: filename, data. */
193 if (argc-optind < 2) {
194 rrd_set_error("Not enough arguments");
196 return -1;
197 }
199 rc = rrd_update_r(argv[optind], template,
200 argc - optind - 1, argv + optind + 1);
201 return rc;
202 }
204 int
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
206 {
207 return _rrd_update(filename, template, argc, argv, NULL);
208 }
210 int
211 _rrd_update(char *filename, char *template, int argc, char **argv,
212 info_t *pcdp_summary)
213 {
215 int arg_i = 2;
216 short j;
217 unsigned long i,ii,iii=1;
219 unsigned long rra_begin; /* byte pointer to the rra
220 * area in the rrd file. this
221 * pointer never changes value */
222 unsigned long rra_start; /* byte pointer to the rra
223 * area in the rrd file. this
224 * pointer changes as each rrd is
225 * processed. */
226 unsigned long rra_current; /* byte pointer to the current write
227 * spot in the rrd file. */
228 unsigned long rra_pos_tmp; /* temporary byte pointer. */
229 double interval,
230 pre_int,post_int; /* interval between this and
231 * the last run */
232 unsigned long proc_pdp_st; /* which pdp_st was the last
233 * to be processed */
234 unsigned long occu_pdp_st; /* when was the pdp_st
235 * before the last update
236 * time */
237 unsigned long proc_pdp_age; /* how old was the data in
238 * the pdp prep area when it
239 * was last updated */
240 unsigned long occu_pdp_age; /* how long ago was the last
241 * pdp_step time */
242 rrd_value_t *pdp_new; /* prepare the incoming data
243 * to be added the the
244 * existing entry */
245 rrd_value_t *pdp_temp; /* prepare the pdp values
246 * to be added the the
247 * cdp values */
249 long *tmpl_idx; /* index representing the settings
250 transported by the template index */
251 unsigned long tmpl_cnt = 2; /* time and data */
253 FILE *rrd_file;
254 rrd_t rrd;
255 time_t current_time = 0;
256 time_t rra_time = 0; /* time of update for a RRA */
257 unsigned long current_time_usec=0;/* microseconds part of current time */
258 struct timeval tmp_time; /* used for time conversion */
260 char **updvals;
261 int schedule_smooth = 0;
262 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263 /* a vector of future Holt-Winters seasonal coefs */
264 unsigned long elapsed_pdp_st;
265 /* number of elapsed PDP steps since last update */
266 unsigned long *rra_step_cnt = NULL;
267 /* number of rows to be updated in an RRA for a data
268 * value. */
269 unsigned long start_pdp_offset;
270 /* number of PDP steps since the last update that
271 * are assigned to the first CDP to be generated
272 * since the last update. */
273 unsigned short scratch_idx;
274 /* index into the CDP scratch array */
275 enum cf_en current_cf;
276 /* numeric id of the current consolidation function */
277 rpnstack_t rpnstack; /* used for COMPUTE DS */
278 int version; /* rrd version */
279 char *endptr; /* used in the conversion */
280 #ifdef HAVE_MMAP
281 void *rrd_mmaped_file;
282 unsigned long rrd_filesize;
283 #endif
285 rpnstack_init(&rpnstack);
287 /* need at least 1 arguments: data. */
288 if (argc < 1) {
289 rrd_set_error("Not enough arguments");
290 return -1;
291 }
295 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
296 return -1;
297 }
298 /* initialize time */
299 version = atoi(rrd.stat_head->version);
300 gettimeofday(&tmp_time, 0);
301 normalize_time(&tmp_time);
302 current_time = tmp_time.tv_sec;
303 if(version >= 3) {
304 current_time_usec = tmp_time.tv_usec;
305 }
306 else {
307 current_time_usec = 0;
308 }
310 rra_current = rra_start = rra_begin = ftell(rrd_file);
311 /* This is defined in the ANSI C standard, section 7.9.5.3:
313 When a file is opened with udpate mode ('+' as the second
314 or third character in the ... list of mode argument
315 variables), both input and ouptut may be performed on the
316 associated stream. However, ... input may not be directly
317 followed by output without an intervening call to a file
318 positioning function, unless the input oepration encounters
319 end-of-file. */
320 #ifdef HAVE_MMAP
321 fseek(rrd_file, 0, SEEK_END);
322 rrd_filesize = ftell(rrd_file);
323 fseek(rrd_file, rra_current, SEEK_SET);
324 #else
325 fseek(rrd_file, 0, SEEK_CUR);
326 #endif
329 /* get exclusive lock to whole file.
330 * lock gets removed when we close the file.
331 */
332 if (LockRRD(rrd_file) != 0) {
333 rrd_set_error("could not lock RRD");
334 rrd_free(&rrd);
335 fclose(rrd_file);
336 return(-1);
337 }
339 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340 rrd_set_error("allocating updvals pointer array");
341 rrd_free(&rrd);
342 fclose(rrd_file);
343 return(-1);
344 }
346 if ((pdp_temp = malloc(sizeof(rrd_value_t)
347 *rrd.stat_head->ds_cnt))==NULL){
348 rrd_set_error("allocating pdp_temp ...");
349 free(updvals);
350 rrd_free(&rrd);
351 fclose(rrd_file);
352 return(-1);
353 }
355 if ((tmpl_idx = malloc(sizeof(unsigned long)
356 *(rrd.stat_head->ds_cnt+1)))==NULL){
357 rrd_set_error("allocating tmpl_idx ...");
358 free(pdp_temp);
359 free(updvals);
360 rrd_free(&rrd);
361 fclose(rrd_file);
362 return(-1);
363 }
364 /* initialize template redirector */
365 /* default config example (assume DS 1 is a CDEF DS)
366 tmpl_idx[0] -> 0; (time)
367 tmpl_idx[1] -> 1; (DS 0)
368 tmpl_idx[2] -> 3; (DS 2)
369 tmpl_idx[3] -> 4; (DS 3) */
370 tmpl_idx[0] = 0; /* time */
371 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
372 {
373 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
374 tmpl_idx[ii++]=i;
375 }
376 tmpl_cnt= ii;
378 if (template) {
379 char *dsname;
380 unsigned int tmpl_len;
381 dsname = template;
382 tmpl_cnt = 1; /* the first entry is the time */
383 tmpl_len = strlen(template);
384 for(i=0;i<=tmpl_len ;i++) {
385 if (template[i] == ':' || template[i] == '\0') {
386 template[i] = '\0';
387 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388 rrd_set_error("Template contains more DS definitions than RRD");
389 free(updvals); free(pdp_temp);
390 free(tmpl_idx); rrd_free(&rrd);
391 fclose(rrd_file); return(-1);
392 }
393 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394 rrd_set_error("unknown DS name '%s'",dsname);
395 free(updvals); free(pdp_temp);
396 free(tmpl_idx); rrd_free(&rrd);
397 fclose(rrd_file); return(-1);
398 } else {
399 /* the first element is always the time */
400 tmpl_idx[tmpl_cnt-1]++;
401 /* go to the next entry on the template */
402 dsname = &template[i+1];
403 /* fix the damage we did before */
404 if (i<tmpl_len) {
405 template[i]=':';
406 }
408 }
409 }
410 }
411 }
412 if ((pdp_new = malloc(sizeof(rrd_value_t)
413 *rrd.stat_head->ds_cnt))==NULL){
414 rrd_set_error("allocating pdp_new ...");
415 free(updvals);
416 free(pdp_temp);
417 free(tmpl_idx);
418 rrd_free(&rrd);
419 fclose(rrd_file);
420 return(-1);
421 }
423 #ifdef HAVE_MMAP
424 rrd_mmaped_file = mmap(0,
425 rrd_filesize,
426 PROT_READ | PROT_WRITE,
427 MAP_SHARED,
428 fileno(rrd_file),
429 0);
430 if (rrd_mmaped_file == MAP_FAILED) {
431 rrd_set_error("error mmapping file %s", filename);
432 free(updvals);
433 free(pdp_temp);
434 free(tmpl_idx);
435 rrd_free(&rrd);
436 fclose(rrd_file);
437 return(-1);
438 }
439 #endif
440 /* loop through the arguments. */
441 for(arg_i=0; arg_i<argc;arg_i++) {
442 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443 char *step_start = stepper;
444 char *p;
445 char *parsetime_error = NULL;
446 enum {atstyle, normal} timesyntax;
447 struct rrd_time_value ds_tv;
448 if (stepper == NULL){
449 rrd_set_error("failed duplication argv entry");
450 free(updvals);
451 free(pdp_temp);
452 free(tmpl_idx);
453 rrd_free(&rrd);
454 #ifdef HAVE_MMAP
455 munmap(rrd_mmaped_file, rrd_filesize);
456 #endif
457 fclose(rrd_file);
458 return(-1);
459 }
460 /* initialize all ds input to unknown except the first one
461 which has always got to be set */
462 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463 strcpy(stepper,argv[arg_i]);
464 updvals[0]=stepper;
465 /* separate all ds elements; first must be examined separately
466 due to alternate time syntax */
467 if ((p=strchr(stepper,'@'))!=NULL) {
468 timesyntax = atstyle;
469 *p = '\0';
470 stepper = p+1;
471 } else if ((p=strchr(stepper,':'))!=NULL) {
472 timesyntax = normal;
473 *p = '\0';
474 stepper = p+1;
475 } else {
476 rrd_set_error("expected timestamp not found in data source from %s:...",
477 argv[arg_i]);
478 free(step_start);
479 break;
480 }
481 ii=1;
482 updvals[tmpl_idx[ii]] = stepper;
483 while (*stepper) {
484 if (*stepper == ':') {
485 *stepper = '\0';
486 ii++;
487 if (ii<tmpl_cnt){
488 updvals[tmpl_idx[ii]] = stepper+1;
489 }
490 }
491 stepper++;
492 }
494 if (ii != tmpl_cnt-1) {
495 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496 tmpl_cnt-1, ii, argv[arg_i]);
497 free(step_start);
498 break;
499 }
501 /* get the time from the reading ... handle N */
502 if (timesyntax == atstyle) {
503 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
505 free(step_start);
506 break;
507 }
508 if (ds_tv.type == RELATIVE_TO_END_TIME ||
509 ds_tv.type == RELATIVE_TO_START_TIME) {
510 rrd_set_error("specifying time relative to the 'start' "
511 "or 'end' makes no sense here: %s",
512 updvals[0]);
513 free(step_start);
514 break;
515 }
517 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
520 } else if (strcmp(updvals[0],"N")==0){
521 gettimeofday(&tmp_time, 0);
522 normalize_time(&tmp_time);
523 current_time = tmp_time.tv_sec;
524 current_time_usec = tmp_time.tv_usec;
525 } else {
526 double tmp;
527 tmp = strtod(updvals[0], 0);
528 current_time = floor(tmp);
529 current_time_usec = (long)((tmp - current_time) * 1000000L);
530 }
531 /* dont do any correction for old version RRDs */
532 if(version < 3)
533 current_time_usec = 0;
535 if(current_time <= rrd.live_head->last_up){
536 rrd_set_error("illegal attempt to update using time %ld when "
537 "last update time is %ld (minimum one second step)",
538 current_time, rrd.live_head->last_up);
539 free(step_start);
540 break;
541 }
544 /* seek to the beginning of the rra's */
545 if (rra_current != rra_begin) {
546 #ifndef HAVE_MMAP
547 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548 rrd_set_error("seek error in rrd");
549 free(step_start);
550 break;
551 }
552 #endif
553 rra_current = rra_begin;
554 }
555 rra_start = rra_begin;
557 /* when was the current pdp started */
558 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
561 /* when did the last pdp_st occur */
562 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563 occu_pdp_st = current_time - occu_pdp_age;
564 /* interval = current_time - rrd.live_head->last_up; */
565 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
567 if (occu_pdp_st > proc_pdp_st){
568 /* OK we passed the pdp_st moment*/
569 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570 * occurred before the latest
571 * pdp_st moment*/
572 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573 post_int = occu_pdp_age; /* how much after it */
574 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
575 } else {
576 pre_int = interval;
577 post_int = 0;
578 }
580 #ifdef DEBUG
581 printf(
582 "proc_pdp_age %lu\t"
583 "proc_pdp_st %lu\t"
584 "occu_pfp_age %lu\t"
585 "occu_pdp_st %lu\t"
586 "int %lf\t"
587 "pre_int %lf\t"
588 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
589 occu_pdp_age, occu_pdp_st,
590 interval, pre_int, post_int);
591 #endif
593 /* process the data sources and update the pdp_prep
594 * area accordingly */
595 for(i=0;i<rrd.stat_head->ds_cnt;i++){
596 enum dst_en dst_idx;
597 dst_idx= dst_conv(rrd.ds_def[i].dst);
598 /* NOTE: DST_CDEF should never enter this if block, because
599 * updvals[i+1][0] is initialized to 'U'; unless the caller
600 * accidently specified a value for the DST_CDEF. To handle
601 * this case, an extra check is required. */
602 if((updvals[i+1][0] != 'U') &&
603 (dst_idx != DST_CDEF) &&
604 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
605 double rate = DNAN;
606 /* the data source type defines how to process the data */
607 /* pdp_new contains rate * time ... eg the bytes
608 * transferred during the interval. Doing it this way saves
609 * a lot of math operations */
612 switch(dst_idx){
613 case DST_COUNTER:
614 case DST_DERIVE:
615 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
616 for(ii=0;updvals[i+1][ii] != '\0';ii++){
617 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
618 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
619 break;
620 }
621 }
622 if (rrd_test_error()){
623 break;
624 }
625 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
626 if(dst_idx == DST_COUNTER) {
627 /* simple overflow catcher suggested by Andres Kroonmaa */
628 /* this will fail terribly for non 32 or 64 bit counters ... */
629 /* are there any others in SNMP land ? */
630 if (pdp_new[i] < (double)0.0 )
631 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
632 if (pdp_new[i] < (double)0.0 )
633 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
634 }
635 rate = pdp_new[i] / interval;
636 }
637 else {
638 pdp_new[i]= DNAN;
639 }
640 break;
641 case DST_ABSOLUTE:
642 errno = 0;
643 pdp_new[i] = strtod(updvals[i+1],&endptr);
644 if (errno > 0){
645 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
646 break;
647 };
648 if (endptr[0] != '\0'){
649 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
650 break;
651 }
652 rate = pdp_new[i] / interval;
653 break;
654 case DST_GAUGE:
655 errno = 0;
656 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
657 if (errno > 0){
658 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
659 break;
660 };
661 if (endptr[0] != '\0'){
662 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
663 break;
664 }
665 rate = pdp_new[i] / interval;
666 break;
667 default:
668 rrd_set_error("rrd contains unknown DS type : '%s'",
669 rrd.ds_def[i].dst);
670 break;
671 }
672 /* break out of this for loop if the error string is set */
673 if (rrd_test_error()){
674 break;
675 }
676 /* make sure pdp_temp is neither too large or too small
677 * if any of these occur it becomes unknown ...
678 * sorry folks ... */
679 if ( ! isnan(rate) &&
680 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
681 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
682 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
683 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
684 pdp_new[i] = DNAN;
685 }
686 } else {
687 /* no news is news all the same */
688 pdp_new[i] = DNAN;
689 }
691 /* make a copy of the command line argument for the next run */
692 #ifdef DEBUG
693 fprintf(stderr,
694 "prep ds[%lu]\t"
695 "last_arg '%s'\t"
696 "this_arg '%s'\t"
697 "pdp_new %10.2f\n",
698 i,
699 rrd.pdp_prep[i].last_ds,
700 updvals[i+1], pdp_new[i]);
701 #endif
702 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
703 strncpy(rrd.pdp_prep[i].last_ds,
704 updvals[i+1],LAST_DS_LEN-1);
705 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
706 }
707 }
708 /* break out of the argument parsing loop if the error_string is set */
709 if (rrd_test_error()){
710 free(step_start);
711 break;
712 }
713 /* has a pdp_st moment occurred since the last run ? */
715 if (proc_pdp_st == occu_pdp_st){
716 /* no we have not passed a pdp_st moment. therefore update is simple */
718 for(i=0;i<rrd.stat_head->ds_cnt;i++){
719 if(isnan(pdp_new[i]))
720 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
721 else
722 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
723 #ifdef DEBUG
724 fprintf(stderr,
725 "NO PDP ds[%lu]\t"
726 "value %10.2f\t"
727 "unkn_sec %5lu\n",
728 i,
729 rrd.pdp_prep[i].scratch[PDP_val].u_val,
730 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
731 #endif
732 }
733 } else {
734 /* an pdp_st has occurred. */
736 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
737 * occurred up to the last run.
738 pdp_new[] contains rate*seconds from the latest run.
739 pdp_temp[] will contain the rate for cdp */
741 for(i=0;i<rrd.stat_head->ds_cnt;i++){
742 /* update pdp_prep to the current pdp_st */
743 if(isnan(pdp_new[i]))
744 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
745 else
746 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
747 pdp_new[i]/(double)interval*(double)pre_int;
749 /* if too much of the pdp_prep is unknown we dump it */
750 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
751 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
752 (occu_pdp_st-proc_pdp_st <=
753 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
754 pdp_temp[i] = DNAN;
755 } else {
756 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
757 / (double)( occu_pdp_st
758 - proc_pdp_st
759 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
760 }
762 /* process CDEF data sources; remember each CDEF DS can
763 * only reference other DS with a lower index number */
764 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
765 rpnp_t *rpnp;
766 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
767 /* substitue data values for OP_VARIABLE nodes */
768 for (ii = 0; rpnp[ii].op != OP_END; ii++)
769 {
770 if (rpnp[ii].op == OP_VARIABLE) {
771 rpnp[ii].op = OP_NUMBER;
772 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
773 }
774 }
775 /* run the rpn calculator */
776 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
777 free(rpnp);
778 break; /* exits the data sources pdp_temp loop */
779 }
780 }
782 /* make pdp_prep ready for the next run */
783 if(isnan(pdp_new[i])){
784 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
785 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
786 } else {
787 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
788 rrd.pdp_prep[i].scratch[PDP_val].u_val =
789 pdp_new[i]/(double)interval*(double)post_int;
790 }
792 #ifdef DEBUG
793 fprintf(stderr,
794 "PDP UPD ds[%lu]\t"
795 "pdp_temp %10.2f\t"
796 "new_prep %10.2f\t"
797 "new_unkn_sec %5lu\n",
798 i, pdp_temp[i],
799 rrd.pdp_prep[i].scratch[PDP_val].u_val,
800 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
801 #endif
802 }
804 /* if there were errors during the last loop, bail out here */
805 if (rrd_test_error()){
806 free(step_start);
807 break;
808 }
810 /* compute the number of elapsed pdp_st moments */
811 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
812 #ifdef DEBUG
813 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
814 #endif
815 if (rra_step_cnt == NULL)
816 {
817 rra_step_cnt = (unsigned long *)
818 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
819 }
821 for(i = 0, rra_start = rra_begin;
822 i < rrd.stat_head->rra_cnt;
823 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
824 i++)
825 {
826 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
827 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
828 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
829 if (start_pdp_offset <= elapsed_pdp_st) {
830 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
831 rrd.rra_def[i].pdp_cnt + 1;
832 } else {
833 rra_step_cnt[i] = 0;
834 }
836 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
837 {
838 /* If this is a bulk update, we need to skip ahead in the seasonal
839 * arrays so that they will be correct for the next observed value;
840 * note that for the bulk update itself, no update will occur to
841 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
842 * be set to DNAN. */
843 if (rra_step_cnt[i] > 2)
844 {
845 /* skip update by resetting rra_step_cnt[i],
846 * note that this is not data source specific; this is due
847 * to the bulk update, not a DNAN value for the specific data
848 * source. */
849 rra_step_cnt[i] = 0;
850 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
851 &last_seasonal_coef);
852 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
853 &seasonal_coef);
854 }
856 /* periodically run a smoother for seasonal effects */
857 /* Need to use first cdp parameter buffer to track
858 * burnin (burnin requires a specific smoothing schedule).
859 * The CDP_init_seasonal parameter is really an RRA level,
860 * not a data source within RRA level parameter, but the rra_def
861 * is read only for rrd_update (not flushed to disk). */
862 iii = i*(rrd.stat_head -> ds_cnt);
863 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
864 <= BURNIN_CYCLES)
865 {
866 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
867 > rrd.rra_def[i].row_cnt - 1) {
868 /* mark off one of the burnin cycles */
869 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
870 schedule_smooth = 1;
871 }
872 } else {
873 /* someone has no doubt invented a trick to deal with this
874 * wrap around, but at least this code is clear. */
875 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
876 rrd.rra_ptr[i].cur_row)
877 {
878 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
879 * mapping between PDP and CDP */
880 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
881 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
882 {
883 #ifdef DEBUG
884 fprintf(stderr,
885 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
886 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
887 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
888 #endif
889 schedule_smooth = 1;
890 }
891 } else {
892 /* can't rely on negative numbers because we are working with
893 * unsigned values */
894 /* Don't need modulus here. If we've wrapped more than once, only
895 * one smooth is executed at the end. */
896 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
897 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
898 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
899 {
900 #ifdef DEBUG
901 fprintf(stderr,
902 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
903 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
904 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
905 #endif
906 schedule_smooth = 1;
907 }
908 }
909 }
911 rra_current = ftell(rrd_file);
912 } /* if cf is DEVSEASONAL or SEASONAL */
914 if (rrd_test_error()) break;
916 /* update CDP_PREP areas */
917 /* loop over data soures within each RRA */
918 for(ii = 0;
919 ii < rrd.stat_head->ds_cnt;
920 ii++)
921 {
923 /* iii indexes the CDP prep area for this data source within the RRA */
924 iii=i*rrd.stat_head->ds_cnt+ii;
926 if (rrd.rra_def[i].pdp_cnt > 1) {
928 if (rra_step_cnt[i] > 0) {
929 /* If we are in this block, as least 1 CDP value will be written to
930 * disk, this is the CDP_primary_val entry. If more than 1 value needs
931 * to be written, then the "fill in" value is the CDP_secondary_val
932 * entry. */
933 if (isnan(pdp_temp[ii]))
934 {
935 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
936 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
937 } else {
938 /* CDP_secondary value is the RRA "fill in" value for intermediary
939 * CDP data entries. No matter the CF, the value is the same because
940 * the average, max, min, and last of a list of identical values is
941 * the same, namely, the value itself. */
942 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
943 }
945 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
946 > rrd.rra_def[i].pdp_cnt*
947 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
948 {
949 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
950 /* initialize carry over */
951 if (current_cf == CF_AVERAGE) {
952 if (isnan(pdp_temp[ii])) {
953 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
954 } else {
955 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
956 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
957 }
958 } else {
959 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
960 }
961 } else {
962 rrd_value_t cum_val, cur_val;
963 switch (current_cf) {
964 case CF_AVERAGE:
965 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
966 cur_val = IFDNAN(pdp_temp[ii],0.0);
967 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
968 (cum_val + cur_val * start_pdp_offset) /
969 (rrd.rra_def[i].pdp_cnt
970 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
971 /* initialize carry over value */
972 if (isnan(pdp_temp[ii])) {
973 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
974 } else {
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
976 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
977 }
978 break;
979 case CF_MAXIMUM:
980 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
981 cur_val = IFDNAN(pdp_temp[ii],-DINF);
982 #ifdef DEBUG
983 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
984 isnan(pdp_temp[ii])) {
985 fprintf(stderr,
986 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
987 i,ii);
988 exit(-1);
989 }
990 #endif
991 if (cur_val > cum_val)
992 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
993 else
994 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
995 /* initialize carry over value */
996 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
997 break;
998 case CF_MINIMUM:
999 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1000 cur_val = IFDNAN(pdp_temp[ii],DINF);
1001 #ifdef DEBUG
1002 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1003 isnan(pdp_temp[ii])) {
1004 fprintf(stderr,
1005 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1006 i,ii);
1007 exit(-1);
1008 }
1009 #endif
1010 if (cur_val < cum_val)
1011 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1012 else
1013 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1014 /* initialize carry over value */
1015 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1016 break;
1017 case CF_LAST:
1018 default:
1019 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1020 /* initialize carry over value */
1021 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1022 break;
1023 }
1024 } /* endif meets xff value requirement for a valid value */
1025 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1026 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1027 if (isnan(pdp_temp[ii]))
1028 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1029 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1030 else
1031 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1032 } else /* rra_step_cnt[i] == 0 */
1033 {
1034 #ifdef DEBUG
1035 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1036 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1037 i,ii);
1038 } else {
1039 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1040 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1041 }
1042 #endif
1043 if (isnan(pdp_temp[ii])) {
1044 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1045 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1046 {
1047 if (current_cf == CF_AVERAGE) {
1048 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1049 elapsed_pdp_st;
1050 } else {
1051 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1052 }
1053 #ifdef DEBUG
1054 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1055 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1056 #endif
1057 } else {
1058 switch (current_cf) {
1059 case CF_AVERAGE:
1060 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1061 elapsed_pdp_st;
1062 break;
1063 case CF_MINIMUM:
1064 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1065 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1066 break;
1067 case CF_MAXIMUM:
1068 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1069 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1070 break;
1071 case CF_LAST:
1072 default:
1073 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1074 break;
1075 }
1076 }
1077 }
1078 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1079 if (elapsed_pdp_st > 2)
1080 {
1081 switch (current_cf) {
1082 case CF_AVERAGE:
1083 default:
1084 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1085 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1086 break;
1087 case CF_SEASONAL:
1088 case CF_DEVSEASONAL:
1089 /* need to update cached seasonal values, so they are consistent
1090 * with the bulk update */
1091 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1092 * CDP_last_deviation are the same. */
1093 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1094 last_seasonal_coef[ii];
1095 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1096 seasonal_coef[ii];
1097 break;
1098 case CF_HWPREDICT:
1099 /* need to update the null_count and last_null_count.
1100 * even do this for non-DNAN pdp_temp because the
1101 * algorithm is not learning from batch updates. */
1102 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1103 elapsed_pdp_st;
1104 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1105 elapsed_pdp_st - 1;
1106 /* fall through */
1107 case CF_DEVPREDICT:
1108 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1109 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1110 break;
1111 case CF_FAILURES:
1112 /* do not count missed bulk values as failures */
1113 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1114 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1115 /* need to reset violations buffer.
1116 * could do this more carefully, but for now, just
1117 * assume a bulk update wipes away all violations. */
1118 erase_violations(&rrd, iii, i);
1119 break;
1120 }
1121 }
1122 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1124 if (rrd_test_error()) break;
1126 } /* endif data sources loop */
1127 } /* end RRA Loop */
1129 /* this loop is only entered if elapsed_pdp_st < 3 */
1130 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1131 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1132 {
1133 for(i = 0, rra_start = rra_begin;
1134 i < rrd.stat_head->rra_cnt;
1135 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1136 i++)
1137 {
1138 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1140 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1141 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1142 {
1143 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1144 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1145 &seasonal_coef);
1146 rra_current = ftell(rrd_file);
1147 }
1148 if (rrd_test_error()) break;
1149 /* loop over data soures within each RRA */
1150 for(ii = 0;
1151 ii < rrd.stat_head->ds_cnt;
1152 ii++)
1153 {
1154 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1155 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1156 scratch_idx, seasonal_coef);
1157 }
1158 } /* end RRA Loop */
1159 if (rrd_test_error()) break;
1160 } /* end elapsed_pdp_st loop */
1162 if (rrd_test_error()) break;
1164 /* Ready to write to disk */
1165 /* Move sequentially through the file, writing one RRA at a time.
1166 * Note this architecture divorces the computation of CDP with
1167 * flushing updated RRA entries to disk. */
1168 for(i = 0, rra_start = rra_begin;
1169 i < rrd.stat_head->rra_cnt;
1170 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1171 i++) {
1172 /* is there anything to write for this RRA? If not, continue. */
1173 if (rra_step_cnt[i] == 0) continue;
1175 /* write the first row */
1176 #ifdef DEBUG
1177 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1178 #endif
1179 rrd.rra_ptr[i].cur_row++;
1180 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1181 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1182 /* positition on the first row */
1183 rra_pos_tmp = rra_start +
1184 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1185 if(rra_pos_tmp != rra_current) {
1186 #ifndef HAVE_MMAP
1187 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1188 rrd_set_error("seek error in rrd");
1189 break;
1190 }
1191 #endif
1192 rra_current = rra_pos_tmp;
1193 }
1195 #ifdef DEBUG
1196 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1197 #endif
1198 scratch_idx = CDP_primary_val;
1199 if (pcdp_summary != NULL)
1200 {
1201 rra_time = (current_time - current_time
1202 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1203 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1204 }
1205 #ifdef HAVE_MMAP
1206 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1207 pcdp_summary, &rra_time, rrd_mmaped_file);
1208 #else
1209 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1210 pcdp_summary, &rra_time);
1211 #endif
1212 if (rrd_test_error()) break;
1214 /* write other rows of the bulk update, if any */
1215 scratch_idx = CDP_secondary_val;
1216 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1217 {
1218 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1219 {
1220 #ifdef DEBUG
1221 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1222 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1223 #endif
1224 /* wrap */
1225 rrd.rra_ptr[i].cur_row = 0;
1226 /* seek back to beginning of current rra */
1227 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1228 {
1229 rrd_set_error("seek error in rrd");
1230 break;
1231 }
1232 #ifdef DEBUG
1233 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1234 #endif
1235 rra_current = rra_start;
1236 }
1237 if (pcdp_summary != NULL)
1238 {
1239 rra_time = (current_time - current_time
1240 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1241 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1242 }
1243 #ifdef HAVE_MMAP
1244 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1245 pcdp_summary, &rra_time, rrd_mmaped_file);
1246 #else
1247 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1248 pcdp_summary, &rra_time);
1249 #endif
1250 }
1252 if (rrd_test_error())
1253 break;
1254 } /* RRA LOOP */
1256 /* break out of the argument parsing loop if error_string is set */
1257 if (rrd_test_error()){
1258 free(step_start);
1259 break;
1260 }
1262 } /* endif a pdp_st has occurred */
1263 rrd.live_head->last_up = current_time;
1264 rrd.live_head->last_up_usec = current_time_usec;
1265 free(step_start);
1266 } /* function argument loop */
1268 if (seasonal_coef != NULL) free(seasonal_coef);
1269 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1270 if (rra_step_cnt != NULL) free(rra_step_cnt);
1271 rpnstack_free(&rpnstack);
1273 #ifdef HAVE_MMAP
1274 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1275 rrd_set_error("error writing(unmapping) file: %s", filename);
1276 }
1277 #endif
1278 /* if we got here and if there is an error and if the file has not been
1279 * written to, then close things up and return. */
1280 if (rrd_test_error()) {
1281 free(updvals);
1282 free(tmpl_idx);
1283 rrd_free(&rrd);
1284 free(pdp_temp);
1285 free(pdp_new);
1286 fclose(rrd_file);
1287 return(-1);
1288 }
1290 /* aargh ... that was tough ... so many loops ... anyway, its done.
1291 * we just need to write back the live header portion now*/
1293 if (fseek(rrd_file, (sizeof(stat_head_t)
1294 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1295 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1296 SEEK_SET) != 0) {
1297 rrd_set_error("seek rrd for live header writeback");
1298 free(updvals);
1299 free(tmpl_idx);
1300 rrd_free(&rrd);
1301 free(pdp_temp);
1302 free(pdp_new);
1303 fclose(rrd_file);
1304 return(-1);
1305 }
1307 if(version >= 3) {
1308 if(fwrite( rrd.live_head,
1309 sizeof(live_head_t), 1, rrd_file) != 1){
1310 rrd_set_error("fwrite live_head to rrd");
1311 free(updvals);
1312 rrd_free(&rrd);
1313 free(tmpl_idx);
1314 free(pdp_temp);
1315 free(pdp_new);
1316 fclose(rrd_file);
1317 return(-1);
1318 }
1319 }
1320 else {
1321 if(fwrite( &rrd.live_head->last_up,
1322 sizeof(time_t), 1, rrd_file) != 1){
1323 rrd_set_error("fwrite live_head to rrd");
1324 free(updvals);
1325 rrd_free(&rrd);
1326 free(tmpl_idx);
1327 free(pdp_temp);
1328 free(pdp_new);
1329 fclose(rrd_file);
1330 return(-1);
1331 }
1332 }
1335 if(fwrite( rrd.pdp_prep,
1336 sizeof(pdp_prep_t),
1337 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1338 rrd_set_error("ftwrite pdp_prep to rrd");
1339 free(updvals);
1340 rrd_free(&rrd);
1341 free(tmpl_idx);
1342 free(pdp_temp);
1343 free(pdp_new);
1344 fclose(rrd_file);
1345 return(-1);
1346 }
1348 if(fwrite( rrd.cdp_prep,
1349 sizeof(cdp_prep_t),
1350 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1351 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1353 rrd_set_error("ftwrite cdp_prep to rrd");
1354 free(updvals);
1355 free(tmpl_idx);
1356 rrd_free(&rrd);
1357 free(pdp_temp);
1358 free(pdp_new);
1359 fclose(rrd_file);
1360 return(-1);
1361 }
1363 if(fwrite( rrd.rra_ptr,
1364 sizeof(rra_ptr_t),
1365 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1366 rrd_set_error("fwrite rra_ptr to rrd");
1367 free(updvals);
1368 free(tmpl_idx);
1369 rrd_free(&rrd);
1370 free(pdp_temp);
1371 free(pdp_new);
1372 fclose(rrd_file);
1373 return(-1);
1374 }
1376 /* OK now close the files and free the memory */
1377 if(fclose(rrd_file) != 0){
1378 rrd_set_error("closing rrd");
1379 free(updvals);
1380 free(tmpl_idx);
1381 rrd_free(&rrd);
1382 free(pdp_temp);
1383 free(pdp_new);
1384 return(-1);
1385 }
1387 /* calling the smoothing code here guarantees at most
1388 * one smoothing operation per rrd_update call. Unfortunately,
1389 * it is possible with bulk updates, or a long-delayed update
1390 * for smoothing to occur off-schedule. This really isn't
1391 * critical except during the burning cycles. */
1392 if (schedule_smooth)
1393 {
1394 rrd_file = fopen(filename,"rb+");
1395 rra_start = rra_begin;
1396 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1397 {
1398 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1399 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1400 {
1401 #ifdef DEBUG
1402 fprintf(stderr,"Running smoother for rra %ld\n",i);
1403 #endif
1404 apply_smoother(&rrd,i,rra_start,rrd_file);
1405 if (rrd_test_error())
1406 break;
1407 }
1408 rra_start += rrd.rra_def[i].row_cnt
1409 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1410 }
1411 fclose(rrd_file);
1412 }
1413 rrd_free(&rrd);
1414 free(updvals);
1415 free(tmpl_idx);
1416 free(pdp_new);
1417 free(pdp_temp);
1418 return(0);
1419 }
1421 /*
1422 * get exclusive lock to whole file.
1423 * lock gets removed when we close the file
1424 *
1425 * returns 0 on success
1426 */
1427 int
1428 LockRRD(FILE *rrdfile)
1429 {
1430 int rrd_fd; /* File descriptor for RRD */
1431 int rcstat;
1433 rrd_fd = fileno(rrdfile);
1435 {
1436 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1437 struct _stat st;
1439 if ( _fstat( rrd_fd, &st ) == 0 ) {
1440 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1441 } else {
1442 rcstat = -1;
1443 }
1444 #else
1445 struct flock lock;
1446 lock.l_type = F_WRLCK; /* exclusive write lock */
1447 lock.l_len = 0; /* whole file */
1448 lock.l_start = 0; /* start of file */
1449 lock.l_whence = SEEK_SET; /* end of file */
1451 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1452 #endif
1453 }
1455 return(rcstat);
1456 }
1459 #ifdef HAVE_MMAP
1460 info_t
1461 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1462 unsigned short CDP_scratch_idx,
1463 #ifndef DEBUG
1464 FILE UNUSED(*rrd_file),
1465 #else
1466 FILE *rrd_file,
1467 #endif
1468 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1469 #else
1470 info_t
1471 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1472 unsigned short CDP_scratch_idx, FILE *rrd_file,
1473 info_t *pcdp_summary, time_t *rra_time)
1474 #endif
1475 {
1476 unsigned long ds_idx, cdp_idx;
1477 infoval iv;
1479 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1480 {
1481 /* compute the cdp index */
1482 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1483 #ifdef DEBUG
1484 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1485 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1486 rrd -> rra_def[rra_idx].cf_nam);
1487 #endif
1488 if (pcdp_summary != NULL)
1489 {
1490 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1491 /* append info to the return hash */
1492 pcdp_summary = info_push(pcdp_summary,
1493 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1494 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1495 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1496 RD_I_VAL, iv);
1497 }
1498 #ifdef HAVE_MMAP
1499 memcpy((char *)rrd_mmaped_file + *rra_current,
1500 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1501 sizeof(rrd_value_t));
1502 #else
1503 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1504 sizeof(rrd_value_t),1,rrd_file) != 1)
1505 {
1506 rrd_set_error("writing rrd");
1507 return 0;
1508 }
1509 #endif
1510 *rra_current += sizeof(rrd_value_t);
1511 }
1512 return (pcdp_summary);
1513 }