1b0cd9aa32d4fc8880873536df503f8b19bdf238
1 /*****************************************************************************
2 * RRDtool 1.2.11 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 struct timeval {
36 time_t tv_sec; /* seconds */
37 long tv_usec; /* microseconds */
38 };
40 struct __timezone {
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
43 };
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
47 struct timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
53 }
55 #endif
56 /*
57 * normilize time as returned by gettimeofday. usec part must
58 * be always >= 0
59 */
60 static void normalize_time(struct timeval *t)
61 {
62 if(t->tv_usec < 0) {
63 t->tv_sec--;
64 t->tv_usec += 1000000L;
65 }
66 }
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
70 #ifdef HAVE_MMAP
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
72 unsigned long *rra_current,
73 unsigned short CDP_scratch_idx,
74 #ifndef DEBUG
75 FILE UNUSED(*rrd_file),
76 #else
77 FILE *rrd_file,
78 #endif
79 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
80 #else
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
82 unsigned long *rra_current,
83 unsigned short CDP_scratch_idx, FILE *rrd_file,
84 info_t *pcdp_summary, time_t *rra_time);
85 #endif
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv,
88 info_t*);
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
93 #ifdef STANDALONE
94 int
95 main(int argc, char **argv){
96 rrd_update(argc,argv);
97 if (rrd_test_error()) {
98 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
99 "Usage: rrdupdate filename\n"
100 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101 "\t\t\ttime|N:value[:value...]\n\n"
102 "\t\t\tat-time@value[:value...]\n\n"
103 "\t\t\t[ time:value[:value...] ..]\n\n");
105 printf("ERROR: %s\n",rrd_get_error());
106 rrd_clear_error();
107 return 1;
108 }
109 return 0;
110 }
111 #endif
113 info_t *rrd_update_v(int argc, char **argv)
114 {
115 char *template = NULL;
116 info_t *result = NULL;
117 infoval rc;
118 optind = 0; opterr = 0; /* initialize getopt */
120 while (1) {
121 static struct option long_options[] =
122 {
123 {"template", required_argument, 0, 't'},
124 {0,0,0,0}
125 };
126 int option_index = 0;
127 int opt;
128 opt = getopt_long(argc, argv, "t:",
129 long_options, &option_index);
131 if (opt == EOF)
132 break;
134 switch(opt) {
135 case 't':
136 template = optarg;
137 break;
139 case '?':
140 rrd_set_error("unknown option '%s'",argv[optind-1]);
141 rc.u_int = -1;
142 goto end_tag;
143 }
144 }
146 /* need at least 2 arguments: filename, data. */
147 if (argc-optind < 2) {
148 rrd_set_error("Not enough arguments");
149 rc.u_int = -1;
150 goto end_tag;
151 }
152 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153 rc.u_int = _rrd_update(argv[optind], template,
154 argc - optind - 1, argv + optind + 1, result);
155 result->value.u_int = rc.u_int;
156 end_tag:
157 return result;
158 }
160 int
161 rrd_update(int argc, char **argv)
162 {
163 char *template = NULL;
164 int rc;
165 optind = 0; opterr = 0; /* initialize getopt */
167 while (1) {
168 static struct option long_options[] =
169 {
170 {"template", required_argument, 0, 't'},
171 {0,0,0,0}
172 };
173 int option_index = 0;
174 int opt;
175 opt = getopt_long(argc, argv, "t:",
176 long_options, &option_index);
178 if (opt == EOF)
179 break;
181 switch(opt) {
182 case 't':
183 template = optarg;
184 break;
186 case '?':
187 rrd_set_error("unknown option '%s'",argv[optind-1]);
188 return(-1);
189 }
190 }
192 /* need at least 2 arguments: filename, data. */
193 if (argc-optind < 2) {
194 rrd_set_error("Not enough arguments");
196 return -1;
197 }
199 rc = rrd_update_r(argv[optind], template,
200 argc - optind - 1, argv + optind + 1);
201 return rc;
202 }
204 int
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
206 {
207 return _rrd_update(filename, template, argc, argv, NULL);
208 }
210 int
211 _rrd_update(char *filename, char *template, int argc, char **argv,
212 info_t *pcdp_summary)
213 {
215 int arg_i = 2;
216 short j;
217 unsigned long i,ii,iii=1;
219 unsigned long rra_begin; /* byte pointer to the rra
220 * area in the rrd file. this
221 * pointer never changes value */
222 unsigned long rra_start; /* byte pointer to the rra
223 * area in the rrd file. this
224 * pointer changes as each rrd is
225 * processed. */
226 unsigned long rra_current; /* byte pointer to the current write
227 * spot in the rrd file. */
228 unsigned long rra_pos_tmp; /* temporary byte pointer. */
229 double interval,
230 pre_int,post_int; /* interval between this and
231 * the last run */
232 unsigned long proc_pdp_st; /* which pdp_st was the last
233 * to be processed */
234 unsigned long occu_pdp_st; /* when was the pdp_st
235 * before the last update
236 * time */
237 unsigned long proc_pdp_age; /* how old was the data in
238 * the pdp prep area when it
239 * was last updated */
240 unsigned long occu_pdp_age; /* how long ago was the last
241 * pdp_step time */
242 rrd_value_t *pdp_new; /* prepare the incoming data
243 * to be added the the
244 * existing entry */
245 rrd_value_t *pdp_temp; /* prepare the pdp values
246 * to be added the the
247 * cdp values */
249 long *tmpl_idx; /* index representing the settings
250 transported by the template index */
251 unsigned long tmpl_cnt = 2; /* time and data */
253 FILE *rrd_file;
254 rrd_t rrd;
255 time_t current_time = 0;
256 time_t rra_time = 0; /* time of update for a RRA */
257 unsigned long current_time_usec=0;/* microseconds part of current time */
258 struct timeval tmp_time; /* used for time conversion */
260 char **updvals;
261 int schedule_smooth = 0;
262 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263 /* a vector of future Holt-Winters seasonal coefs */
264 unsigned long elapsed_pdp_st;
265 /* number of elapsed PDP steps since last update */
266 unsigned long *rra_step_cnt = NULL;
267 /* number of rows to be updated in an RRA for a data
268 * value. */
269 unsigned long start_pdp_offset;
270 /* number of PDP steps since the last update that
271 * are assigned to the first CDP to be generated
272 * since the last update. */
273 unsigned short scratch_idx;
274 /* index into the CDP scratch array */
275 enum cf_en current_cf;
276 /* numeric id of the current consolidation function */
277 rpnstack_t rpnstack; /* used for COMPUTE DS */
278 int version; /* rrd version */
279 char *endptr; /* used in the conversion */
280 #ifdef HAVE_MMAP
281 void *rrd_mmaped_file;
282 unsigned long rrd_filesize;
283 #endif
285 rpnstack_init(&rpnstack);
287 /* need at least 1 arguments: data. */
288 if (argc < 1) {
289 rrd_set_error("Not enough arguments");
290 return -1;
291 }
295 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
296 return -1;
297 }
298 /* initialize time */
299 version = atoi(rrd.stat_head->version);
300 gettimeofday(&tmp_time, 0);
301 normalize_time(&tmp_time);
302 current_time = tmp_time.tv_sec;
303 if(version >= 3) {
304 current_time_usec = tmp_time.tv_usec;
305 }
306 else {
307 current_time_usec = 0;
308 }
310 rra_current = rra_start = rra_begin = ftell(rrd_file);
311 /* This is defined in the ANSI C standard, section 7.9.5.3:
313 When a file is opened with udpate mode ('+' as the second
314 or third character in the ... list of mode argument
315 variables), both input and ouptut may be performed on the
316 associated stream. However, ... input may not be directly
317 followed by output without an intervening call to a file
318 positioning function, unless the input oepration encounters
319 end-of-file. */
320 #ifdef HAVE_MMAP
321 fseek(rrd_file, 0, SEEK_END);
322 rrd_filesize = ftell(rrd_file);
323 fseek(rrd_file, rra_current, SEEK_SET);
324 #else
325 fseek(rrd_file, 0, SEEK_CUR);
326 #endif
329 /* get exclusive lock to whole file.
330 * lock gets removed when we close the file.
331 */
332 if (LockRRD(rrd_file) != 0) {
333 rrd_set_error("could not lock RRD");
334 rrd_free(&rrd);
335 fclose(rrd_file);
336 return(-1);
337 }
339 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340 rrd_set_error("allocating updvals pointer array");
341 rrd_free(&rrd);
342 fclose(rrd_file);
343 return(-1);
344 }
346 if ((pdp_temp = malloc(sizeof(rrd_value_t)
347 *rrd.stat_head->ds_cnt))==NULL){
348 rrd_set_error("allocating pdp_temp ...");
349 free(updvals);
350 rrd_free(&rrd);
351 fclose(rrd_file);
352 return(-1);
353 }
355 if ((tmpl_idx = malloc(sizeof(unsigned long)
356 *(rrd.stat_head->ds_cnt+1)))==NULL){
357 rrd_set_error("allocating tmpl_idx ...");
358 free(pdp_temp);
359 free(updvals);
360 rrd_free(&rrd);
361 fclose(rrd_file);
362 return(-1);
363 }
364 /* initialize template redirector */
365 /* default config example (assume DS 1 is a CDEF DS)
366 tmpl_idx[0] -> 0; (time)
367 tmpl_idx[1] -> 1; (DS 0)
368 tmpl_idx[2] -> 3; (DS 2)
369 tmpl_idx[3] -> 4; (DS 3) */
370 tmpl_idx[0] = 0; /* time */
371 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
372 {
373 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
374 tmpl_idx[ii++]=i;
375 }
376 tmpl_cnt= ii;
378 if (template) {
379 char *dsname;
380 unsigned int tmpl_len;
381 dsname = template;
382 tmpl_cnt = 1; /* the first entry is the time */
383 tmpl_len = strlen(template);
384 for(i=0;i<=tmpl_len ;i++) {
385 if (template[i] == ':' || template[i] == '\0') {
386 template[i] = '\0';
387 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388 rrd_set_error("Template contains more DS definitions than RRD");
389 free(updvals); free(pdp_temp);
390 free(tmpl_idx); rrd_free(&rrd);
391 fclose(rrd_file); return(-1);
392 }
393 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394 rrd_set_error("unknown DS name '%s'",dsname);
395 free(updvals); free(pdp_temp);
396 free(tmpl_idx); rrd_free(&rrd);
397 fclose(rrd_file); return(-1);
398 } else {
399 /* the first element is always the time */
400 tmpl_idx[tmpl_cnt-1]++;
401 /* go to the next entry on the template */
402 dsname = &template[i+1];
403 /* fix the damage we did before */
404 if (i<tmpl_len) {
405 template[i]=':';
406 }
408 }
409 }
410 }
411 }
412 if ((pdp_new = malloc(sizeof(rrd_value_t)
413 *rrd.stat_head->ds_cnt))==NULL){
414 rrd_set_error("allocating pdp_new ...");
415 free(updvals);
416 free(pdp_temp);
417 free(tmpl_idx);
418 rrd_free(&rrd);
419 fclose(rrd_file);
420 return(-1);
421 }
423 #ifdef HAVE_MMAP
424 rrd_mmaped_file = mmap(0,
425 rrd_filesize,
426 PROT_READ | PROT_WRITE,
427 MAP_SHARED,
428 fileno(rrd_file),
429 0);
430 if (rrd_mmaped_file == MAP_FAILED) {
431 rrd_set_error("error mmapping file %s", filename);
432 free(updvals);
433 free(pdp_temp);
434 free(tmpl_idx);
435 rrd_free(&rrd);
436 fclose(rrd_file);
437 return(-1);
438 }
439 #endif
440 /* loop through the arguments. */
441 for(arg_i=0; arg_i<argc;arg_i++) {
442 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443 char *step_start = stepper;
444 char *p;
445 char *parsetime_error = NULL;
446 enum {atstyle, normal} timesyntax;
447 struct rrd_time_value ds_tv;
448 if (stepper == NULL){
449 rrd_set_error("failed duplication argv entry");
450 free(updvals);
451 free(pdp_temp);
452 free(tmpl_idx);
453 rrd_free(&rrd);
454 #ifdef HAVE_MMAP
455 munmap(rrd_mmaped_file, rrd_filesize);
456 #endif
457 fclose(rrd_file);
458 return(-1);
459 }
460 /* initialize all ds input to unknown except the first one
461 which has always got to be set */
462 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463 strcpy(stepper,argv[arg_i]);
464 updvals[0]=stepper;
465 /* separate all ds elements; first must be examined separately
466 due to alternate time syntax */
467 if ((p=strchr(stepper,'@'))!=NULL) {
468 timesyntax = atstyle;
469 *p = '\0';
470 stepper = p+1;
471 } else if ((p=strchr(stepper,':'))!=NULL) {
472 timesyntax = normal;
473 *p = '\0';
474 stepper = p+1;
475 } else {
476 rrd_set_error("expected timestamp not found in data source from %s:...",
477 argv[arg_i]);
478 free(step_start);
479 break;
480 }
481 ii=1;
482 updvals[tmpl_idx[ii]] = stepper;
483 while (*stepper) {
484 if (*stepper == ':') {
485 *stepper = '\0';
486 ii++;
487 if (ii<tmpl_cnt){
488 updvals[tmpl_idx[ii]] = stepper+1;
489 }
490 }
491 stepper++;
492 }
494 if (ii != tmpl_cnt-1) {
495 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496 tmpl_cnt-1, ii, argv[arg_i]);
497 free(step_start);
498 break;
499 }
501 /* get the time from the reading ... handle N */
502 if (timesyntax == atstyle) {
503 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
505 free(step_start);
506 break;
507 }
508 if (ds_tv.type == RELATIVE_TO_END_TIME ||
509 ds_tv.type == RELATIVE_TO_START_TIME) {
510 rrd_set_error("specifying time relative to the 'start' "
511 "or 'end' makes no sense here: %s",
512 updvals[0]);
513 free(step_start);
514 break;
515 }
517 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
520 } else if (strcmp(updvals[0],"N")==0){
521 gettimeofday(&tmp_time, 0);
522 normalize_time(&tmp_time);
523 current_time = tmp_time.tv_sec;
524 current_time_usec = tmp_time.tv_usec;
525 } else {
526 double tmp;
527 tmp = strtod(updvals[0], 0);
528 current_time = floor(tmp);
529 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
530 }
531 /* dont do any correction for old version RRDs */
532 if(version < 3)
533 current_time_usec = 0;
535 if(current_time < rrd.live_head->last_up ||
536 (current_time == rrd.live_head->last_up &&
537 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
538 rrd_set_error("illegal attempt to update using time %ld when "
539 "last update time is %ld (minimum one second step)",
540 current_time, rrd.live_head->last_up);
541 free(step_start);
542 break;
543 }
546 /* seek to the beginning of the rra's */
547 if (rra_current != rra_begin) {
548 #ifndef HAVE_MMAP
549 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
550 rrd_set_error("seek error in rrd");
551 free(step_start);
552 break;
553 }
554 #endif
555 rra_current = rra_begin;
556 }
557 rra_start = rra_begin;
559 /* when was the current pdp started */
560 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
561 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
563 /* when did the last pdp_st occur */
564 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
565 occu_pdp_st = current_time - occu_pdp_age;
567 /* interval = current_time - rrd.live_head->last_up; */
568 interval = (double)(current_time - rrd.live_head->last_up)
569 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
571 if (occu_pdp_st > proc_pdp_st){
572 /* OK we passed the pdp_st moment*/
573 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
574 * occurred before the latest
575 * pdp_st moment*/
576 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
577 post_int = occu_pdp_age; /* how much after it */
578 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
579 } else {
580 pre_int = interval;
581 post_int = 0;
582 }
584 #ifdef DEBUG
585 printf(
586 "proc_pdp_age %lu\t"
587 "proc_pdp_st %lu\t"
588 "occu_pfp_age %lu\t"
589 "occu_pdp_st %lu\t"
590 "int %lf\t"
591 "pre_int %lf\t"
592 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
593 occu_pdp_age, occu_pdp_st,
594 interval, pre_int, post_int);
595 #endif
597 /* process the data sources and update the pdp_prep
598 * area accordingly */
599 for(i=0;i<rrd.stat_head->ds_cnt;i++){
600 enum dst_en dst_idx;
601 dst_idx= dst_conv(rrd.ds_def[i].dst);
603 /* make sure we do not build diffs with old last_ds values */
604 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
605 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
606 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
607 }
609 /* NOTE: DST_CDEF should never enter this if block, because
610 * updvals[i+1][0] is initialized to 'U'; unless the caller
611 * accidently specified a value for the DST_CDEF. To handle
612 * this case, an extra check is required. */
614 if((updvals[i+1][0] != 'U') &&
615 (dst_idx != DST_CDEF) &&
616 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
617 double rate = DNAN;
618 /* the data source type defines how to process the data */
619 /* pdp_new contains rate * time ... eg the bytes
620 * transferred during the interval. Doing it this way saves
621 * a lot of math operations */
624 switch(dst_idx){
625 case DST_COUNTER:
626 case DST_DERIVE:
627 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
628 for(ii=0;updvals[i+1][ii] != '\0';ii++){
629 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
630 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
631 break;
632 }
633 }
634 if (rrd_test_error()){
635 break;
636 }
637 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
638 if(dst_idx == DST_COUNTER) {
639 /* simple overflow catcher suggested by Andres Kroonmaa */
640 /* this will fail terribly for non 32 or 64 bit counters ... */
641 /* are there any others in SNMP land ? */
642 if (pdp_new[i] < (double)0.0 )
643 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
644 if (pdp_new[i] < (double)0.0 )
645 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
646 }
647 rate = pdp_new[i] / interval;
648 }
649 else {
650 pdp_new[i]= DNAN;
651 }
652 break;
653 case DST_ABSOLUTE:
654 errno = 0;
655 pdp_new[i] = strtod(updvals[i+1],&endptr);
656 if (errno > 0){
657 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
658 break;
659 };
660 if (endptr[0] != '\0'){
661 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
662 break;
663 }
664 rate = pdp_new[i] / interval;
665 break;
666 case DST_GAUGE:
667 errno = 0;
668 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
669 if (errno > 0){
670 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
671 break;
672 };
673 if (endptr[0] != '\0'){
674 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
675 break;
676 }
677 rate = pdp_new[i] / interval;
678 break;
679 default:
680 rrd_set_error("rrd contains unknown DS type : '%s'",
681 rrd.ds_def[i].dst);
682 break;
683 }
684 /* break out of this for loop if the error string is set */
685 if (rrd_test_error()){
686 break;
687 }
688 /* make sure pdp_temp is neither too large or too small
689 * if any of these occur it becomes unknown ...
690 * sorry folks ... */
691 if ( ! isnan(rate) &&
692 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
693 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
694 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
695 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
696 pdp_new[i] = DNAN;
697 }
698 } else {
699 /* no news is news all the same */
700 pdp_new[i] = DNAN;
701 }
703 /* make a copy of the command line argument for the next run */
704 #ifdef DEBUG
705 fprintf(stderr,
706 "prep ds[%lu]\t"
707 "last_arg '%s'\t"
708 "this_arg '%s'\t"
709 "pdp_new %10.2f\n",
710 i,
711 rrd.pdp_prep[i].last_ds,
712 updvals[i+1], pdp_new[i]);
713 #endif
714 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
715 strncpy(rrd.pdp_prep[i].last_ds,
716 updvals[i+1],LAST_DS_LEN-1);
717 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
718 }
719 }
720 /* break out of the argument parsing loop if the error_string is set */
721 if (rrd_test_error()){
722 free(step_start);
723 break;
724 }
725 /* has a pdp_st moment occurred since the last run ? */
727 if (proc_pdp_st == occu_pdp_st){
728 /* no we have not passed a pdp_st moment. therefore update is simple */
730 for(i=0;i<rrd.stat_head->ds_cnt;i++){
731 if(isnan(pdp_new[i]))
732 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval-0.5);
733 else
734 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
735 #ifdef DEBUG
736 fprintf(stderr,
737 "NO PDP ds[%lu]\t"
738 "value %10.2f\t"
739 "unkn_sec %5lu\n",
740 i,
741 rrd.pdp_prep[i].scratch[PDP_val].u_val,
742 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
743 #endif
744 }
745 } else {
746 /* an pdp_st has occurred. */
748 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
749 * occurred up to the last run.
750 pdp_new[] contains rate*seconds from the latest run.
751 pdp_temp[] will contain the rate for cdp */
753 for(i=0;i<rrd.stat_head->ds_cnt;i++){
754 /* update pdp_prep to the current pdp_st */
755 if(isnan(pdp_new[i]))
756 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
757 else
758 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
759 pdp_new[i]/interval*pre_int;
761 /* if too much of the pdp_prep is unknown we dump it */
762 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
763 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
764 (occu_pdp_st-proc_pdp_st <=
765 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
766 pdp_temp[i] = DNAN;
767 } else {
768 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
769 / (double)( occu_pdp_st
770 - proc_pdp_st
771 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
772 }
774 /* process CDEF data sources; remember each CDEF DS can
775 * only reference other DS with a lower index number */
776 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
777 rpnp_t *rpnp;
778 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
779 /* substitue data values for OP_VARIABLE nodes */
780 for (ii = 0; rpnp[ii].op != OP_END; ii++)
781 {
782 if (rpnp[ii].op == OP_VARIABLE) {
783 rpnp[ii].op = OP_NUMBER;
784 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
785 }
786 }
787 /* run the rpn calculator */
788 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
789 free(rpnp);
790 break; /* exits the data sources pdp_temp loop */
791 }
792 }
794 /* make pdp_prep ready for the next run */
795 if(isnan(pdp_new[i])){
796 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
797 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
798 } else {
799 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
800 rrd.pdp_prep[i].scratch[PDP_val].u_val =
801 pdp_new[i]/interval*post_int;
802 }
804 #ifdef DEBUG
805 fprintf(stderr,
806 "PDP UPD ds[%lu]\t"
807 "pdp_temp %10.2f\t"
808 "new_prep %10.2f\t"
809 "new_unkn_sec %5lu\n",
810 i, pdp_temp[i],
811 rrd.pdp_prep[i].scratch[PDP_val].u_val,
812 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
813 #endif
814 }
816 /* if there were errors during the last loop, bail out here */
817 if (rrd_test_error()){
818 free(step_start);
819 break;
820 }
822 /* compute the number of elapsed pdp_st moments */
823 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
824 #ifdef DEBUG
825 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
826 #endif
827 if (rra_step_cnt == NULL)
828 {
829 rra_step_cnt = (unsigned long *)
830 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
831 }
833 for(i = 0, rra_start = rra_begin;
834 i < rrd.stat_head->rra_cnt;
835 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
836 i++)
837 {
838 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
839 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
840 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
841 if (start_pdp_offset <= elapsed_pdp_st) {
842 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
843 rrd.rra_def[i].pdp_cnt + 1;
844 } else {
845 rra_step_cnt[i] = 0;
846 }
848 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
849 {
850 /* If this is a bulk update, we need to skip ahead in the seasonal
851 * arrays so that they will be correct for the next observed value;
852 * note that for the bulk update itself, no update will occur to
853 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
854 * be set to DNAN. */
855 if (rra_step_cnt[i] > 2)
856 {
857 /* skip update by resetting rra_step_cnt[i],
858 * note that this is not data source specific; this is due
859 * to the bulk update, not a DNAN value for the specific data
860 * source. */
861 rra_step_cnt[i] = 0;
862 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
863 &last_seasonal_coef);
864 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
865 &seasonal_coef);
866 }
868 /* periodically run a smoother for seasonal effects */
869 /* Need to use first cdp parameter buffer to track
870 * burnin (burnin requires a specific smoothing schedule).
871 * The CDP_init_seasonal parameter is really an RRA level,
872 * not a data source within RRA level parameter, but the rra_def
873 * is read only for rrd_update (not flushed to disk). */
874 iii = i*(rrd.stat_head -> ds_cnt);
875 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
876 <= BURNIN_CYCLES)
877 {
878 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
879 > rrd.rra_def[i].row_cnt - 1) {
880 /* mark off one of the burnin cycles */
881 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
882 schedule_smooth = 1;
883 }
884 } else {
885 /* someone has no doubt invented a trick to deal with this
886 * wrap around, but at least this code is clear. */
887 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
888 rrd.rra_ptr[i].cur_row)
889 {
890 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
891 * mapping between PDP and CDP */
892 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
893 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
894 {
895 #ifdef DEBUG
896 fprintf(stderr,
897 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
898 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
899 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
900 #endif
901 schedule_smooth = 1;
902 }
903 } else {
904 /* can't rely on negative numbers because we are working with
905 * unsigned values */
906 /* Don't need modulus here. If we've wrapped more than once, only
907 * one smooth is executed at the end. */
908 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
909 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
910 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
911 {
912 #ifdef DEBUG
913 fprintf(stderr,
914 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
915 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
916 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
917 #endif
918 schedule_smooth = 1;
919 }
920 }
921 }
923 rra_current = ftell(rrd_file);
924 } /* if cf is DEVSEASONAL or SEASONAL */
926 if (rrd_test_error()) break;
928 /* update CDP_PREP areas */
929 /* loop over data soures within each RRA */
930 for(ii = 0;
931 ii < rrd.stat_head->ds_cnt;
932 ii++)
933 {
935 /* iii indexes the CDP prep area for this data source within the RRA */
936 iii=i*rrd.stat_head->ds_cnt+ii;
938 if (rrd.rra_def[i].pdp_cnt > 1) {
940 if (rra_step_cnt[i] > 0) {
941 /* If we are in this block, as least 1 CDP value will be written to
942 * disk, this is the CDP_primary_val entry. If more than 1 value needs
943 * to be written, then the "fill in" value is the CDP_secondary_val
944 * entry. */
945 if (isnan(pdp_temp[ii]))
946 {
947 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
948 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
949 } else {
950 /* CDP_secondary value is the RRA "fill in" value for intermediary
951 * CDP data entries. No matter the CF, the value is the same because
952 * the average, max, min, and last of a list of identical values is
953 * the same, namely, the value itself. */
954 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
955 }
957 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
958 > rrd.rra_def[i].pdp_cnt*
959 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
960 {
961 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
962 /* initialize carry over */
963 if (current_cf == CF_AVERAGE) {
964 if (isnan(pdp_temp[ii])) {
965 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
966 } else {
967 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
968 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
969 }
970 } else {
971 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
972 }
973 } else {
974 rrd_value_t cum_val, cur_val;
975 switch (current_cf) {
976 case CF_AVERAGE:
977 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
978 cur_val = IFDNAN(pdp_temp[ii],0.0);
979 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
980 (cum_val + cur_val * start_pdp_offset) /
981 (rrd.rra_def[i].pdp_cnt
982 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
983 /* initialize carry over value */
984 if (isnan(pdp_temp[ii])) {
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
986 } else {
987 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
988 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
989 }
990 break;
991 case CF_MAXIMUM:
992 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
993 cur_val = IFDNAN(pdp_temp[ii],-DINF);
994 #ifdef DEBUG
995 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
996 isnan(pdp_temp[ii])) {
997 fprintf(stderr,
998 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
999 i,ii);
1000 exit(-1);
1001 }
1002 #endif
1003 if (cur_val > cum_val)
1004 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1005 else
1006 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1007 /* initialize carry over value */
1008 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1009 break;
1010 case CF_MINIMUM:
1011 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1012 cur_val = IFDNAN(pdp_temp[ii],DINF);
1013 #ifdef DEBUG
1014 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1015 isnan(pdp_temp[ii])) {
1016 fprintf(stderr,
1017 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1018 i,ii);
1019 exit(-1);
1020 }
1021 #endif
1022 if (cur_val < cum_val)
1023 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1024 else
1025 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1026 /* initialize carry over value */
1027 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1028 break;
1029 case CF_LAST:
1030 default:
1031 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1032 /* initialize carry over value */
1033 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1034 break;
1035 }
1036 } /* endif meets xff value requirement for a valid value */
1037 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1038 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1039 if (isnan(pdp_temp[ii]))
1040 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1041 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1042 else
1043 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1044 } else /* rra_step_cnt[i] == 0 */
1045 {
1046 #ifdef DEBUG
1047 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1048 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1049 i,ii);
1050 } else {
1051 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1052 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1053 }
1054 #endif
1055 if (isnan(pdp_temp[ii])) {
1056 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1057 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1058 {
1059 if (current_cf == CF_AVERAGE) {
1060 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1061 elapsed_pdp_st;
1062 } else {
1063 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1064 }
1065 #ifdef DEBUG
1066 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1067 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1068 #endif
1069 } else {
1070 switch (current_cf) {
1071 case CF_AVERAGE:
1072 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1073 elapsed_pdp_st;
1074 break;
1075 case CF_MINIMUM:
1076 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078 break;
1079 case CF_MAXIMUM:
1080 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082 break;
1083 case CF_LAST:
1084 default:
1085 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086 break;
1087 }
1088 }
1089 }
1090 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1091 if (elapsed_pdp_st > 2)
1092 {
1093 switch (current_cf) {
1094 case CF_AVERAGE:
1095 default:
1096 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1097 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1098 break;
1099 case CF_SEASONAL:
1100 case CF_DEVSEASONAL:
1101 /* need to update cached seasonal values, so they are consistent
1102 * with the bulk update */
1103 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1104 * CDP_last_deviation are the same. */
1105 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1106 last_seasonal_coef[ii];
1107 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1108 seasonal_coef[ii];
1109 break;
1110 case CF_HWPREDICT:
1111 /* need to update the null_count and last_null_count.
1112 * even do this for non-DNAN pdp_temp because the
1113 * algorithm is not learning from batch updates. */
1114 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1115 elapsed_pdp_st;
1116 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1117 elapsed_pdp_st - 1;
1118 /* fall through */
1119 case CF_DEVPREDICT:
1120 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1121 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1122 break;
1123 case CF_FAILURES:
1124 /* do not count missed bulk values as failures */
1125 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1126 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1127 /* need to reset violations buffer.
1128 * could do this more carefully, but for now, just
1129 * assume a bulk update wipes away all violations. */
1130 erase_violations(&rrd, iii, i);
1131 break;
1132 }
1133 }
1134 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1136 if (rrd_test_error()) break;
1138 } /* endif data sources loop */
1139 } /* end RRA Loop */
1141 /* this loop is only entered if elapsed_pdp_st < 3 */
1142 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1143 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1144 {
1145 for(i = 0, rra_start = rra_begin;
1146 i < rrd.stat_head->rra_cnt;
1147 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1148 i++)
1149 {
1150 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1152 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1153 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1154 {
1155 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1156 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1157 &seasonal_coef);
1158 rra_current = ftell(rrd_file);
1159 }
1160 if (rrd_test_error()) break;
1161 /* loop over data soures within each RRA */
1162 for(ii = 0;
1163 ii < rrd.stat_head->ds_cnt;
1164 ii++)
1165 {
1166 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1167 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1168 scratch_idx, seasonal_coef);
1169 }
1170 } /* end RRA Loop */
1171 if (rrd_test_error()) break;
1172 } /* end elapsed_pdp_st loop */
1174 if (rrd_test_error()) break;
1176 /* Ready to write to disk */
1177 /* Move sequentially through the file, writing one RRA at a time.
1178 * Note this architecture divorces the computation of CDP with
1179 * flushing updated RRA entries to disk. */
1180 for(i = 0, rra_start = rra_begin;
1181 i < rrd.stat_head->rra_cnt;
1182 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1183 i++) {
1184 /* is there anything to write for this RRA? If not, continue. */
1185 if (rra_step_cnt[i] == 0) continue;
1187 /* write the first row */
1188 #ifdef DEBUG
1189 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1190 #endif
1191 rrd.rra_ptr[i].cur_row++;
1192 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1193 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1194 /* positition on the first row */
1195 rra_pos_tmp = rra_start +
1196 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1197 if(rra_pos_tmp != rra_current) {
1198 #ifndef HAVE_MMAP
1199 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1200 rrd_set_error("seek error in rrd");
1201 break;
1202 }
1203 #endif
1204 rra_current = rra_pos_tmp;
1205 }
1207 #ifdef DEBUG
1208 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1209 #endif
1210 scratch_idx = CDP_primary_val;
1211 if (pcdp_summary != NULL)
1212 {
1213 rra_time = (current_time - current_time
1214 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1215 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1216 }
1217 #ifdef HAVE_MMAP
1218 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1219 pcdp_summary, &rra_time, rrd_mmaped_file);
1220 #else
1221 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1222 pcdp_summary, &rra_time);
1223 #endif
1224 if (rrd_test_error()) break;
1226 /* write other rows of the bulk update, if any */
1227 scratch_idx = CDP_secondary_val;
1228 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1229 {
1230 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1231 {
1232 #ifdef DEBUG
1233 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1234 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1235 #endif
1236 /* wrap */
1237 rrd.rra_ptr[i].cur_row = 0;
1238 /* seek back to beginning of current rra */
1239 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1240 {
1241 rrd_set_error("seek error in rrd");
1242 break;
1243 }
1244 #ifdef DEBUG
1245 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1246 #endif
1247 rra_current = rra_start;
1248 }
1249 if (pcdp_summary != NULL)
1250 {
1251 rra_time = (current_time - current_time
1252 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1253 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1254 }
1255 #ifdef HAVE_MMAP
1256 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1257 pcdp_summary, &rra_time, rrd_mmaped_file);
1258 #else
1259 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1260 pcdp_summary, &rra_time);
1261 #endif
1262 }
1264 if (rrd_test_error())
1265 break;
1266 } /* RRA LOOP */
1268 /* break out of the argument parsing loop if error_string is set */
1269 if (rrd_test_error()){
1270 free(step_start);
1271 break;
1272 }
1274 } /* endif a pdp_st has occurred */
1275 rrd.live_head->last_up = current_time;
1276 rrd.live_head->last_up_usec = current_time_usec;
1277 free(step_start);
1278 } /* function argument loop */
1280 if (seasonal_coef != NULL) free(seasonal_coef);
1281 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1282 if (rra_step_cnt != NULL) free(rra_step_cnt);
1283 rpnstack_free(&rpnstack);
1285 #ifdef HAVE_MMAP
1286 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1287 rrd_set_error("error writing(unmapping) file: %s", filename);
1288 }
1289 #endif
1290 /* if we got here and if there is an error and if the file has not been
1291 * written to, then close things up and return. */
1292 if (rrd_test_error()) {
1293 free(updvals);
1294 free(tmpl_idx);
1295 rrd_free(&rrd);
1296 free(pdp_temp);
1297 free(pdp_new);
1298 fclose(rrd_file);
1299 return(-1);
1300 }
1302 /* aargh ... that was tough ... so many loops ... anyway, its done.
1303 * we just need to write back the live header portion now*/
1305 if (fseek(rrd_file, (sizeof(stat_head_t)
1306 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1307 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1308 SEEK_SET) != 0) {
1309 rrd_set_error("seek rrd for live header writeback");
1310 free(updvals);
1311 free(tmpl_idx);
1312 rrd_free(&rrd);
1313 free(pdp_temp);
1314 free(pdp_new);
1315 fclose(rrd_file);
1316 return(-1);
1317 }
1319 if(version >= 3) {
1320 if(fwrite( rrd.live_head,
1321 sizeof(live_head_t), 1, rrd_file) != 1){
1322 rrd_set_error("fwrite live_head to rrd");
1323 free(updvals);
1324 rrd_free(&rrd);
1325 free(tmpl_idx);
1326 free(pdp_temp);
1327 free(pdp_new);
1328 fclose(rrd_file);
1329 return(-1);
1330 }
1331 }
1332 else {
1333 if(fwrite( &rrd.live_head->last_up,
1334 sizeof(time_t), 1, rrd_file) != 1){
1335 rrd_set_error("fwrite live_head to rrd");
1336 free(updvals);
1337 rrd_free(&rrd);
1338 free(tmpl_idx);
1339 free(pdp_temp);
1340 free(pdp_new);
1341 fclose(rrd_file);
1342 return(-1);
1343 }
1344 }
1347 if(fwrite( rrd.pdp_prep,
1348 sizeof(pdp_prep_t),
1349 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1350 rrd_set_error("ftwrite pdp_prep to rrd");
1351 free(updvals);
1352 rrd_free(&rrd);
1353 free(tmpl_idx);
1354 free(pdp_temp);
1355 free(pdp_new);
1356 fclose(rrd_file);
1357 return(-1);
1358 }
1360 if(fwrite( rrd.cdp_prep,
1361 sizeof(cdp_prep_t),
1362 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1363 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1365 rrd_set_error("ftwrite cdp_prep to rrd");
1366 free(updvals);
1367 free(tmpl_idx);
1368 rrd_free(&rrd);
1369 free(pdp_temp);
1370 free(pdp_new);
1371 fclose(rrd_file);
1372 return(-1);
1373 }
1375 if(fwrite( rrd.rra_ptr,
1376 sizeof(rra_ptr_t),
1377 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1378 rrd_set_error("fwrite rra_ptr to rrd");
1379 free(updvals);
1380 free(tmpl_idx);
1381 rrd_free(&rrd);
1382 free(pdp_temp);
1383 free(pdp_new);
1384 fclose(rrd_file);
1385 return(-1);
1386 }
1388 /* OK now close the files and free the memory */
1389 if(fclose(rrd_file) != 0){
1390 rrd_set_error("closing rrd");
1391 free(updvals);
1392 free(tmpl_idx);
1393 rrd_free(&rrd);
1394 free(pdp_temp);
1395 free(pdp_new);
1396 return(-1);
1397 }
1399 /* calling the smoothing code here guarantees at most
1400 * one smoothing operation per rrd_update call. Unfortunately,
1401 * it is possible with bulk updates, or a long-delayed update
1402 * for smoothing to occur off-schedule. This really isn't
1403 * critical except during the burning cycles. */
1404 if (schedule_smooth)
1405 {
1406 rrd_file = fopen(filename,"rb+");
1407 rra_start = rra_begin;
1408 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1409 {
1410 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1411 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1412 {
1413 #ifdef DEBUG
1414 fprintf(stderr,"Running smoother for rra %ld\n",i);
1415 #endif
1416 apply_smoother(&rrd,i,rra_start,rrd_file);
1417 if (rrd_test_error())
1418 break;
1419 }
1420 rra_start += rrd.rra_def[i].row_cnt
1421 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1422 }
1423 fclose(rrd_file);
1424 }
1425 rrd_free(&rrd);
1426 free(updvals);
1427 free(tmpl_idx);
1428 free(pdp_new);
1429 free(pdp_temp);
1430 return(0);
1431 }
1433 /*
1434 * get exclusive lock to whole file.
1435 * lock gets removed when we close the file
1436 *
1437 * returns 0 on success
1438 */
1439 int
1440 LockRRD(FILE *rrdfile)
1441 {
1442 int rrd_fd; /* File descriptor for RRD */
1443 int rcstat;
1445 rrd_fd = fileno(rrdfile);
1447 {
1448 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1449 struct _stat st;
1451 if ( _fstat( rrd_fd, &st ) == 0 ) {
1452 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1453 } else {
1454 rcstat = -1;
1455 }
1456 #else
1457 struct flock lock;
1458 lock.l_type = F_WRLCK; /* exclusive write lock */
1459 lock.l_len = 0; /* whole file */
1460 lock.l_start = 0; /* start of file */
1461 lock.l_whence = SEEK_SET; /* end of file */
1463 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1464 #endif
1465 }
1467 return(rcstat);
1468 }
1471 #ifdef HAVE_MMAP
1472 info_t
1473 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1474 unsigned short CDP_scratch_idx,
1475 #ifndef DEBUG
1476 FILE UNUSED(*rrd_file),
1477 #else
1478 FILE *rrd_file,
1479 #endif
1480 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1481 #else
1482 info_t
1483 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1484 unsigned short CDP_scratch_idx, FILE *rrd_file,
1485 info_t *pcdp_summary, time_t *rra_time)
1486 #endif
1487 {
1488 unsigned long ds_idx, cdp_idx;
1489 infoval iv;
1491 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1492 {
1493 /* compute the cdp index */
1494 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1495 #ifdef DEBUG
1496 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1497 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1498 rrd -> rra_def[rra_idx].cf_nam);
1499 #endif
1500 if (pcdp_summary != NULL)
1501 {
1502 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1503 /* append info to the return hash */
1504 pcdp_summary = info_push(pcdp_summary,
1505 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1506 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1507 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1508 RD_I_VAL, iv);
1509 }
1510 #ifdef HAVE_MMAP
1511 memcpy((char *)rrd_mmaped_file + *rra_current,
1512 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1513 sizeof(rrd_value_t));
1514 #else
1515 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1516 sizeof(rrd_value_t),1,rrd_file) != 1)
1517 {
1518 rrd_set_error("writing rrd");
1519 return 0;
1520 }
1521 #endif
1522 *rra_current += sizeof(rrd_value_t);
1523 }
1524 return (pcdp_summary);
1525 }