1 /*****************************************************************************
2 * RRDtool 1.2.11 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normilize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static void normalize_time(struct timeval *t)
65 {
66 if(t->tv_usec < 0) {
67 t->tv_sec--;
68 t->tv_usec += 1000000L;
69 }
70 }
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *template, int argc, char **argv);
91 int _rrd_update(char *filename, char *template, int argc, char **argv,
92 info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 #ifdef STANDALONE
98 int
99 main(int argc, char **argv){
100 rrd_update(argc,argv);
101 if (rrd_test_error()) {
102 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
103 "Usage: rrdupdate filename\n"
104 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105 "\t\t\ttime|N:value[:value...]\n\n"
106 "\t\t\tat-time@value[:value...]\n\n"
107 "\t\t\t[ time:value[:value...] ..]\n\n");
109 printf("ERROR: %s\n",rrd_get_error());
110 rrd_clear_error();
111 return 1;
112 }
113 return 0;
114 }
115 #endif
117 info_t *rrd_update_v(int argc, char **argv)
118 {
119 char *template = NULL;
120 info_t *result = NULL;
121 infoval rc;
122 optind = 0; opterr = 0; /* initialize getopt */
124 while (1) {
125 static struct option long_options[] =
126 {
127 {"template", required_argument, 0, 't'},
128 {0,0,0,0}
129 };
130 int option_index = 0;
131 int opt;
132 opt = getopt_long(argc, argv, "t:",
133 long_options, &option_index);
135 if (opt == EOF)
136 break;
138 switch(opt) {
139 case 't':
140 template = optarg;
141 break;
143 case '?':
144 rrd_set_error("unknown option '%s'",argv[optind-1]);
145 rc.u_int = -1;
146 goto end_tag;
147 }
148 }
150 /* need at least 2 arguments: filename, data. */
151 if (argc-optind < 2) {
152 rrd_set_error("Not enough arguments");
153 rc.u_int = -1;
154 goto end_tag;
155 }
156 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157 rc.u_int = _rrd_update(argv[optind], template,
158 argc - optind - 1, argv + optind + 1, result);
159 result->value.u_int = rc.u_int;
160 end_tag:
161 return result;
162 }
164 int
165 rrd_update(int argc, char **argv)
166 {
167 char *template = NULL;
168 int rc;
169 optind = 0; opterr = 0; /* initialize getopt */
171 while (1) {
172 static struct option long_options[] =
173 {
174 {"template", required_argument, 0, 't'},
175 {0,0,0,0}
176 };
177 int option_index = 0;
178 int opt;
179 opt = getopt_long(argc, argv, "t:",
180 long_options, &option_index);
182 if (opt == EOF)
183 break;
185 switch(opt) {
186 case 't':
187 template = optarg;
188 break;
190 case '?':
191 rrd_set_error("unknown option '%s'",argv[optind-1]);
192 return(-1);
193 }
194 }
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
200 return -1;
201 }
203 rc = rrd_update_r(argv[optind], template,
204 argc - optind - 1, argv + optind + 1);
205 return rc;
206 }
208 int
209 rrd_update_r(char *filename, char *template, int argc, char **argv)
210 {
211 return _rrd_update(filename, template, argc, argv, NULL);
212 }
214 int
215 _rrd_update(char *filename, char *template, int argc, char **argv,
216 info_t *pcdp_summary)
217 {
219 int arg_i = 2;
220 short j;
221 unsigned long i,ii,iii=1;
223 unsigned long rra_begin; /* byte pointer to the rra
224 * area in the rrd file. this
225 * pointer never changes value */
226 unsigned long rra_start; /* byte pointer to the rra
227 * area in the rrd file. this
228 * pointer changes as each rrd is
229 * processed. */
230 unsigned long rra_current; /* byte pointer to the current write
231 * spot in the rrd file. */
232 unsigned long rra_pos_tmp; /* temporary byte pointer. */
233 double interval,
234 pre_int,post_int; /* interval between this and
235 * the last run */
236 unsigned long proc_pdp_st; /* which pdp_st was the last
237 * to be processed */
238 unsigned long occu_pdp_st; /* when was the pdp_st
239 * before the last update
240 * time */
241 unsigned long proc_pdp_age; /* how old was the data in
242 * the pdp prep area when it
243 * was last updated */
244 unsigned long occu_pdp_age; /* how long ago was the last
245 * pdp_step time */
246 rrd_value_t *pdp_new; /* prepare the incoming data
247 * to be added the the
248 * existing entry */
249 rrd_value_t *pdp_temp; /* prepare the pdp values
250 * to be added the the
251 * cdp values */
253 long *tmpl_idx; /* index representing the settings
254 transported by the template index */
255 unsigned long tmpl_cnt = 2; /* time and data */
257 FILE *rrd_file;
258 rrd_t rrd;
259 time_t current_time = 0;
260 time_t rra_time = 0; /* time of update for a RRA */
261 unsigned long current_time_usec=0;/* microseconds part of current time */
262 struct timeval tmp_time; /* used for time conversion */
264 char **updvals;
265 int schedule_smooth = 0;
266 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267 /* a vector of future Holt-Winters seasonal coefs */
268 unsigned long elapsed_pdp_st;
269 /* number of elapsed PDP steps since last update */
270 unsigned long *rra_step_cnt = NULL;
271 /* number of rows to be updated in an RRA for a data
272 * value. */
273 unsigned long start_pdp_offset;
274 /* number of PDP steps since the last update that
275 * are assigned to the first CDP to be generated
276 * since the last update. */
277 unsigned short scratch_idx;
278 /* index into the CDP scratch array */
279 enum cf_en current_cf;
280 /* numeric id of the current consolidation function */
281 rpnstack_t rpnstack; /* used for COMPUTE DS */
282 int version; /* rrd version */
283 char *endptr; /* used in the conversion */
284 #ifdef HAVE_MMAP
285 void *rrd_mmaped_file;
286 unsigned long rrd_filesize;
287 #endif
289 rpnstack_init(&rpnstack);
291 /* need at least 1 arguments: data. */
292 if (argc < 1) {
293 rrd_set_error("Not enough arguments");
294 return -1;
295 }
299 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
300 return -1;
301 }
302 /* initialize time */
303 version = atoi(rrd.stat_head->version);
304 gettimeofday(&tmp_time, 0);
305 normalize_time(&tmp_time);
306 current_time = tmp_time.tv_sec;
307 if(version >= 3) {
308 current_time_usec = tmp_time.tv_usec;
309 }
310 else {
311 current_time_usec = 0;
312 }
314 rra_current = rra_start = rra_begin = ftell(rrd_file);
315 /* This is defined in the ANSI C standard, section 7.9.5.3:
317 When a file is opened with udpate mode ('+' as the second
318 or third character in the ... list of mode argument
319 variables), both input and ouptut may be performed on the
320 associated stream. However, ... input may not be directly
321 followed by output without an intervening call to a file
322 positioning function, unless the input oepration encounters
323 end-of-file. */
324 #ifdef HAVE_MMAP
325 fseek(rrd_file, 0, SEEK_END);
326 rrd_filesize = ftell(rrd_file);
327 fseek(rrd_file, rra_current, SEEK_SET);
328 #else
329 fseek(rrd_file, 0, SEEK_CUR);
330 #endif
333 /* get exclusive lock to whole file.
334 * lock gets removed when we close the file.
335 */
336 if (LockRRD(rrd_file) != 0) {
337 rrd_set_error("could not lock RRD");
338 rrd_free(&rrd);
339 fclose(rrd_file);
340 return(-1);
341 }
343 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating updvals pointer array");
345 rrd_free(&rrd);
346 fclose(rrd_file);
347 return(-1);
348 }
350 if ((pdp_temp = malloc(sizeof(rrd_value_t)
351 *rrd.stat_head->ds_cnt))==NULL){
352 rrd_set_error("allocating pdp_temp ...");
353 free(updvals);
354 rrd_free(&rrd);
355 fclose(rrd_file);
356 return(-1);
357 }
359 if ((tmpl_idx = malloc(sizeof(unsigned long)
360 *(rrd.stat_head->ds_cnt+1)))==NULL){
361 rrd_set_error("allocating tmpl_idx ...");
362 free(pdp_temp);
363 free(updvals);
364 rrd_free(&rrd);
365 fclose(rrd_file);
366 return(-1);
367 }
368 /* initialize template redirector */
369 /* default config example (assume DS 1 is a CDEF DS)
370 tmpl_idx[0] -> 0; (time)
371 tmpl_idx[1] -> 1; (DS 0)
372 tmpl_idx[2] -> 3; (DS 2)
373 tmpl_idx[3] -> 4; (DS 3) */
374 tmpl_idx[0] = 0; /* time */
375 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
376 {
377 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
378 tmpl_idx[ii++]=i;
379 }
380 tmpl_cnt= ii;
382 if (template) {
383 char *dsname;
384 unsigned int tmpl_len;
385 dsname = template;
386 tmpl_cnt = 1; /* the first entry is the time */
387 tmpl_len = strlen(template);
388 for(i=0;i<=tmpl_len ;i++) {
389 if (template[i] == ':' || template[i] == '\0') {
390 template[i] = '\0';
391 if (tmpl_cnt>rrd.stat_head->ds_cnt){
392 rrd_set_error("Template contains more DS definitions than RRD");
393 free(updvals); free(pdp_temp);
394 free(tmpl_idx); rrd_free(&rrd);
395 fclose(rrd_file); return(-1);
396 }
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
398 rrd_set_error("unknown DS name '%s'",dsname);
399 free(updvals); free(pdp_temp);
400 free(tmpl_idx); rrd_free(&rrd);
401 fclose(rrd_file); return(-1);
402 } else {
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt-1]++;
405 /* go to the next entry on the template */
406 dsname = &template[i+1];
407 /* fix the damage we did before */
408 if (i<tmpl_len) {
409 template[i]=':';
410 }
412 }
413 }
414 }
415 }
416 if ((pdp_new = malloc(sizeof(rrd_value_t)
417 *rrd.stat_head->ds_cnt))==NULL){
418 rrd_set_error("allocating pdp_new ...");
419 free(updvals);
420 free(pdp_temp);
421 free(tmpl_idx);
422 rrd_free(&rrd);
423 fclose(rrd_file);
424 return(-1);
425 }
427 #ifdef HAVE_MMAP
428 rrd_mmaped_file = mmap(0,
429 rrd_filesize,
430 PROT_READ | PROT_WRITE,
431 MAP_SHARED,
432 fileno(rrd_file),
433 0);
434 if (rrd_mmaped_file == MAP_FAILED) {
435 rrd_set_error("error mmapping file %s", filename);
436 free(updvals);
437 free(pdp_temp);
438 free(tmpl_idx);
439 rrd_free(&rrd);
440 fclose(rrd_file);
441 return(-1);
442 }
443 #endif
444 /* loop through the arguments. */
445 for(arg_i=0; arg_i<argc;arg_i++) {
446 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
447 char *step_start = stepper;
448 char *p;
449 char *parsetime_error = NULL;
450 enum {atstyle, normal} timesyntax;
451 struct rrd_time_value ds_tv;
452 if (stepper == NULL){
453 rrd_set_error("failed duplication argv entry");
454 free(updvals);
455 free(pdp_temp);
456 free(tmpl_idx);
457 rrd_free(&rrd);
458 #ifdef HAVE_MMAP
459 munmap(rrd_mmaped_file, rrd_filesize);
460 #endif
461 fclose(rrd_file);
462 return(-1);
463 }
464 /* initialize all ds input to unknown except the first one
465 which has always got to be set */
466 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467 strcpy(stepper,argv[arg_i]);
468 updvals[0]=stepper;
469 /* separate all ds elements; first must be examined separately
470 due to alternate time syntax */
471 if ((p=strchr(stepper,'@'))!=NULL) {
472 timesyntax = atstyle;
473 *p = '\0';
474 stepper = p+1;
475 } else if ((p=strchr(stepper,':'))!=NULL) {
476 timesyntax = normal;
477 *p = '\0';
478 stepper = p+1;
479 } else {
480 rrd_set_error("expected timestamp not found in data source from %s:...",
481 argv[arg_i]);
482 free(step_start);
483 break;
484 }
485 ii=1;
486 updvals[tmpl_idx[ii]] = stepper;
487 while (*stepper) {
488 if (*stepper == ':') {
489 *stepper = '\0';
490 ii++;
491 if (ii<tmpl_cnt){
492 updvals[tmpl_idx[ii]] = stepper+1;
493 }
494 }
495 stepper++;
496 }
498 if (ii != tmpl_cnt-1) {
499 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500 tmpl_cnt-1, ii, argv[arg_i]);
501 free(step_start);
502 break;
503 }
505 /* get the time from the reading ... handle N */
506 if (timesyntax == atstyle) {
507 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
509 free(step_start);
510 break;
511 }
512 if (ds_tv.type == RELATIVE_TO_END_TIME ||
513 ds_tv.type == RELATIVE_TO_START_TIME) {
514 rrd_set_error("specifying time relative to the 'start' "
515 "or 'end' makes no sense here: %s",
516 updvals[0]);
517 free(step_start);
518 break;
519 }
521 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
524 } else if (strcmp(updvals[0],"N")==0){
525 gettimeofday(&tmp_time, 0);
526 normalize_time(&tmp_time);
527 current_time = tmp_time.tv_sec;
528 current_time_usec = tmp_time.tv_usec;
529 } else {
530 double tmp;
531 tmp = strtod(updvals[0], 0);
532 current_time = floor(tmp);
533 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
534 }
535 /* dont do any correction for old version RRDs */
536 if(version < 3)
537 current_time_usec = 0;
539 if(current_time < rrd.live_head->last_up ||
540 (current_time == rrd.live_head->last_up &&
541 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
542 rrd_set_error("illegal attempt to update using time %ld when "
543 "last update time is %ld (minimum one second step)",
544 current_time, rrd.live_head->last_up);
545 free(step_start);
546 break;
547 }
550 /* seek to the beginning of the rra's */
551 if (rra_current != rra_begin) {
552 #ifndef HAVE_MMAP
553 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
554 rrd_set_error("seek error in rrd");
555 free(step_start);
556 break;
557 }
558 #endif
559 rra_current = rra_begin;
560 }
561 rra_start = rra_begin;
563 /* when was the current pdp started */
564 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
567 /* when did the last pdp_st occur */
568 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569 occu_pdp_st = current_time - occu_pdp_age;
571 /* interval = current_time - rrd.live_head->last_up; */
572 interval = (double)(current_time - rrd.live_head->last_up)
573 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
575 if (occu_pdp_st > proc_pdp_st){
576 /* OK we passed the pdp_st moment*/
577 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
578 * occurred before the latest
579 * pdp_st moment*/
580 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
581 post_int = occu_pdp_age; /* how much after it */
582 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
583 } else {
584 pre_int = interval;
585 post_int = 0;
586 }
588 #ifdef DEBUG
589 printf(
590 "proc_pdp_age %lu\t"
591 "proc_pdp_st %lu\t"
592 "occu_pfp_age %lu\t"
593 "occu_pdp_st %lu\t"
594 "int %lf\t"
595 "pre_int %lf\t"
596 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
597 occu_pdp_age, occu_pdp_st,
598 interval, pre_int, post_int);
599 #endif
601 /* process the data sources and update the pdp_prep
602 * area accordingly */
603 for(i=0;i<rrd.stat_head->ds_cnt;i++){
604 enum dst_en dst_idx;
605 dst_idx= dst_conv(rrd.ds_def[i].dst);
607 /* make sure we do not build diffs with old last_ds values */
608 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
609 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
610 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
611 }
613 /* NOTE: DST_CDEF should never enter this if block, because
614 * updvals[i+1][0] is initialized to 'U'; unless the caller
615 * accidently specified a value for the DST_CDEF. To handle
616 * this case, an extra check is required. */
618 if((updvals[i+1][0] != 'U') &&
619 (dst_idx != DST_CDEF) &&
620 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
621 double rate = DNAN;
622 /* the data source type defines how to process the data */
623 /* pdp_new contains rate * time ... eg the bytes
624 * transferred during the interval. Doing it this way saves
625 * a lot of math operations */
628 switch(dst_idx){
629 case DST_COUNTER:
630 case DST_DERIVE:
631 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
632 for(ii=0;updvals[i+1][ii] != '\0';ii++){
633 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
634 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
635 break;
636 }
637 }
638 if (rrd_test_error()){
639 break;
640 }
641 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
642 if(dst_idx == DST_COUNTER) {
643 /* simple overflow catcher suggested by Andres Kroonmaa */
644 /* this will fail terribly for non 32 or 64 bit counters ... */
645 /* are there any others in SNMP land ? */
646 if (pdp_new[i] < (double)0.0 )
647 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
648 if (pdp_new[i] < (double)0.0 )
649 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
650 }
651 rate = pdp_new[i] / interval;
652 }
653 else {
654 pdp_new[i]= DNAN;
655 }
656 break;
657 case DST_ABSOLUTE:
658 errno = 0;
659 pdp_new[i] = strtod(updvals[i+1],&endptr);
660 if (errno > 0){
661 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
662 break;
663 };
664 if (endptr[0] != '\0'){
665 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
666 break;
667 }
668 rate = pdp_new[i] / interval;
669 break;
670 case DST_GAUGE:
671 errno = 0;
672 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
673 if (errno > 0){
674 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
675 break;
676 };
677 if (endptr[0] != '\0'){
678 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
679 break;
680 }
681 rate = pdp_new[i] / interval;
682 break;
683 default:
684 rrd_set_error("rrd contains unknown DS type : '%s'",
685 rrd.ds_def[i].dst);
686 break;
687 }
688 /* break out of this for loop if the error string is set */
689 if (rrd_test_error()){
690 break;
691 }
692 /* make sure pdp_temp is neither too large or too small
693 * if any of these occur it becomes unknown ...
694 * sorry folks ... */
695 if ( ! isnan(rate) &&
696 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
697 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
698 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
699 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
700 pdp_new[i] = DNAN;
701 }
702 } else {
703 /* no news is news all the same */
704 pdp_new[i] = DNAN;
705 }
707 /* make a copy of the command line argument for the next run */
708 #ifdef DEBUG
709 fprintf(stderr,
710 "prep ds[%lu]\t"
711 "last_arg '%s'\t"
712 "this_arg '%s'\t"
713 "pdp_new %10.2f\n",
714 i,
715 rrd.pdp_prep[i].last_ds,
716 updvals[i+1], pdp_new[i]);
717 #endif
718 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
719 strncpy(rrd.pdp_prep[i].last_ds,
720 updvals[i+1],LAST_DS_LEN-1);
721 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
722 }
723 }
724 /* break out of the argument parsing loop if the error_string is set */
725 if (rrd_test_error()){
726 free(step_start);
727 break;
728 }
729 /* has a pdp_st moment occurred since the last run ? */
731 if (proc_pdp_st == occu_pdp_st){
732 /* no we have not passed a pdp_st moment. therefore update is simple */
734 for(i=0;i<rrd.stat_head->ds_cnt;i++){
735 if(isnan(pdp_new[i]))
736 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval+0.5);
737 else {
738 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
739 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
740 } else {
741 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
742 }
743 }
744 #ifdef DEBUG
745 fprintf(stderr,
746 "NO PDP ds[%lu]\t"
747 "value %10.2f\t"
748 "unkn_sec %5lu\n",
749 i,
750 rrd.pdp_prep[i].scratch[PDP_val].u_val,
751 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
752 #endif
753 }
754 } else {
755 /* an pdp_st has occurred. */
757 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
758 * occurred up to the last run.
759 pdp_new[] contains rate*seconds from the latest run.
760 pdp_temp[] will contain the rate for cdp */
762 for(i=0;i<rrd.stat_head->ds_cnt;i++){
763 /* update pdp_prep to the current pdp_st. */
765 if(isnan(pdp_new[i]))
766 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
767 else {
768 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
769 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
770 } else {
771 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
772 }
773 }
776 /* if too much of the pdp_prep is unknown we dump it */
777 if (
778 /* removed because this does not agree with the definition
779 a heart beat can be unknown */
780 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
781 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
782 (occu_pdp_st-proc_pdp_st <=
783 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
784 pdp_temp[i] = DNAN;
785 } else {
786 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
787 / (double)( occu_pdp_st
788 - proc_pdp_st
789 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
790 }
792 /* process CDEF data sources; remember each CDEF DS can
793 * only reference other DS with a lower index number */
794 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
795 rpnp_t *rpnp;
796 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
797 /* substitue data values for OP_VARIABLE nodes */
798 for (ii = 0; rpnp[ii].op != OP_END; ii++)
799 {
800 if (rpnp[ii].op == OP_VARIABLE) {
801 rpnp[ii].op = OP_NUMBER;
802 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
803 }
804 }
805 /* run the rpn calculator */
806 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
807 free(rpnp);
808 break; /* exits the data sources pdp_temp loop */
809 }
810 }
812 /* make pdp_prep ready for the next run */
813 if(isnan(pdp_new[i])){
814 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
815 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
816 } else {
817 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
818 rrd.pdp_prep[i].scratch[PDP_val].u_val =
819 pdp_new[i]/interval*post_int;
820 }
822 #ifdef DEBUG
823 fprintf(stderr,
824 "PDP UPD ds[%lu]\t"
825 "pdp_temp %10.2f\t"
826 "new_prep %10.2f\t"
827 "new_unkn_sec %5lu\n",
828 i, pdp_temp[i],
829 rrd.pdp_prep[i].scratch[PDP_val].u_val,
830 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
831 #endif
832 }
834 /* if there were errors during the last loop, bail out here */
835 if (rrd_test_error()){
836 free(step_start);
837 break;
838 }
840 /* compute the number of elapsed pdp_st moments */
841 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
842 #ifdef DEBUG
843 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
844 #endif
845 if (rra_step_cnt == NULL)
846 {
847 rra_step_cnt = (unsigned long *)
848 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
849 }
851 for(i = 0, rra_start = rra_begin;
852 i < rrd.stat_head->rra_cnt;
853 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
854 i++)
855 {
856 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
857 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
858 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
859 if (start_pdp_offset <= elapsed_pdp_st) {
860 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
861 rrd.rra_def[i].pdp_cnt + 1;
862 } else {
863 rra_step_cnt[i] = 0;
864 }
866 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
867 {
868 /* If this is a bulk update, we need to skip ahead in the seasonal
869 * arrays so that they will be correct for the next observed value;
870 * note that for the bulk update itself, no update will occur to
871 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
872 * be set to DNAN. */
873 if (rra_step_cnt[i] > 2)
874 {
875 /* skip update by resetting rra_step_cnt[i],
876 * note that this is not data source specific; this is due
877 * to the bulk update, not a DNAN value for the specific data
878 * source. */
879 rra_step_cnt[i] = 0;
880 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
881 &last_seasonal_coef);
882 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
883 &seasonal_coef);
884 }
886 /* periodically run a smoother for seasonal effects */
887 /* Need to use first cdp parameter buffer to track
888 * burnin (burnin requires a specific smoothing schedule).
889 * The CDP_init_seasonal parameter is really an RRA level,
890 * not a data source within RRA level parameter, but the rra_def
891 * is read only for rrd_update (not flushed to disk). */
892 iii = i*(rrd.stat_head -> ds_cnt);
893 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
894 <= BURNIN_CYCLES)
895 {
896 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
897 > rrd.rra_def[i].row_cnt - 1) {
898 /* mark off one of the burnin cycles */
899 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
900 schedule_smooth = 1;
901 }
902 } else {
903 /* someone has no doubt invented a trick to deal with this
904 * wrap around, but at least this code is clear. */
905 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
906 rrd.rra_ptr[i].cur_row)
907 {
908 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
909 * mapping between PDP and CDP */
910 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
911 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
912 {
913 #ifdef DEBUG
914 fprintf(stderr,
915 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
916 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
917 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
918 #endif
919 schedule_smooth = 1;
920 }
921 } else {
922 /* can't rely on negative numbers because we are working with
923 * unsigned values */
924 /* Don't need modulus here. If we've wrapped more than once, only
925 * one smooth is executed at the end. */
926 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
927 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
928 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
929 {
930 #ifdef DEBUG
931 fprintf(stderr,
932 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
933 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
934 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
935 #endif
936 schedule_smooth = 1;
937 }
938 }
939 }
941 rra_current = ftell(rrd_file);
942 } /* if cf is DEVSEASONAL or SEASONAL */
944 if (rrd_test_error()) break;
946 /* update CDP_PREP areas */
947 /* loop over data soures within each RRA */
948 for(ii = 0;
949 ii < rrd.stat_head->ds_cnt;
950 ii++)
951 {
953 /* iii indexes the CDP prep area for this data source within the RRA */
954 iii=i*rrd.stat_head->ds_cnt+ii;
956 if (rrd.rra_def[i].pdp_cnt > 1) {
958 if (rra_step_cnt[i] > 0) {
959 /* If we are in this block, as least 1 CDP value will be written to
960 * disk, this is the CDP_primary_val entry. If more than 1 value needs
961 * to be written, then the "fill in" value is the CDP_secondary_val
962 * entry. */
963 if (isnan(pdp_temp[ii]))
964 {
965 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
966 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
967 } else {
968 /* CDP_secondary value is the RRA "fill in" value for intermediary
969 * CDP data entries. No matter the CF, the value is the same because
970 * the average, max, min, and last of a list of identical values is
971 * the same, namely, the value itself. */
972 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
973 }
975 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
976 > rrd.rra_def[i].pdp_cnt*
977 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
978 {
979 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
980 /* initialize carry over */
981 if (current_cf == CF_AVERAGE) {
982 if (isnan(pdp_temp[ii])) {
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
984 } else {
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
986 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
987 }
988 } else {
989 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
990 }
991 } else {
992 rrd_value_t cum_val, cur_val;
993 switch (current_cf) {
994 case CF_AVERAGE:
995 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
996 cur_val = IFDNAN(pdp_temp[ii],0.0);
997 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
998 (cum_val + cur_val * start_pdp_offset) /
999 (rrd.rra_def[i].pdp_cnt
1000 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1001 /* initialize carry over value */
1002 if (isnan(pdp_temp[ii])) {
1003 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1004 } else {
1005 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1006 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1007 }
1008 break;
1009 case CF_MAXIMUM:
1010 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1011 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1012 #ifdef DEBUG
1013 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1014 isnan(pdp_temp[ii])) {
1015 fprintf(stderr,
1016 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1017 i,ii);
1018 exit(-1);
1019 }
1020 #endif
1021 if (cur_val > cum_val)
1022 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1023 else
1024 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1025 /* initialize carry over value */
1026 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1027 break;
1028 case CF_MINIMUM:
1029 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1030 cur_val = IFDNAN(pdp_temp[ii],DINF);
1031 #ifdef DEBUG
1032 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1033 isnan(pdp_temp[ii])) {
1034 fprintf(stderr,
1035 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1036 i,ii);
1037 exit(-1);
1038 }
1039 #endif
1040 if (cur_val < cum_val)
1041 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1042 else
1043 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1044 /* initialize carry over value */
1045 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1046 break;
1047 case CF_LAST:
1048 default:
1049 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1050 /* initialize carry over value */
1051 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1052 break;
1053 }
1054 } /* endif meets xff value requirement for a valid value */
1055 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1056 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1057 if (isnan(pdp_temp[ii]))
1058 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1059 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1060 else
1061 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1062 } else /* rra_step_cnt[i] == 0 */
1063 {
1064 #ifdef DEBUG
1065 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1066 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1067 i,ii);
1068 } else {
1069 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1070 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1071 }
1072 #endif
1073 if (isnan(pdp_temp[ii])) {
1074 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1075 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1076 {
1077 if (current_cf == CF_AVERAGE) {
1078 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1079 elapsed_pdp_st;
1080 } else {
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082 }
1083 #ifdef DEBUG
1084 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1085 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1086 #endif
1087 } else {
1088 switch (current_cf) {
1089 case CF_AVERAGE:
1090 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1091 elapsed_pdp_st;
1092 break;
1093 case CF_MINIMUM:
1094 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096 break;
1097 case CF_MAXIMUM:
1098 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1099 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100 break;
1101 case CF_LAST:
1102 default:
1103 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1104 break;
1105 }
1106 }
1107 }
1108 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1109 if (elapsed_pdp_st > 2)
1110 {
1111 switch (current_cf) {
1112 case CF_AVERAGE:
1113 default:
1114 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1115 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1116 break;
1117 case CF_SEASONAL:
1118 case CF_DEVSEASONAL:
1119 /* need to update cached seasonal values, so they are consistent
1120 * with the bulk update */
1121 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1122 * CDP_last_deviation are the same. */
1123 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1124 last_seasonal_coef[ii];
1125 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1126 seasonal_coef[ii];
1127 break;
1128 case CF_HWPREDICT:
1129 /* need to update the null_count and last_null_count.
1130 * even do this for non-DNAN pdp_temp because the
1131 * algorithm is not learning from batch updates. */
1132 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1133 elapsed_pdp_st;
1134 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1135 elapsed_pdp_st - 1;
1136 /* fall through */
1137 case CF_DEVPREDICT:
1138 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1139 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1140 break;
1141 case CF_FAILURES:
1142 /* do not count missed bulk values as failures */
1143 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1144 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1145 /* need to reset violations buffer.
1146 * could do this more carefully, but for now, just
1147 * assume a bulk update wipes away all violations. */
1148 erase_violations(&rrd, iii, i);
1149 break;
1150 }
1151 }
1152 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1154 if (rrd_test_error()) break;
1156 } /* endif data sources loop */
1157 } /* end RRA Loop */
1159 /* this loop is only entered if elapsed_pdp_st < 3 */
1160 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1161 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1162 {
1163 for(i = 0, rra_start = rra_begin;
1164 i < rrd.stat_head->rra_cnt;
1165 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1166 i++)
1167 {
1168 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1170 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1171 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1172 {
1173 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1174 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1175 &seasonal_coef);
1176 rra_current = ftell(rrd_file);
1177 }
1178 if (rrd_test_error()) break;
1179 /* loop over data soures within each RRA */
1180 for(ii = 0;
1181 ii < rrd.stat_head->ds_cnt;
1182 ii++)
1183 {
1184 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1185 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1186 scratch_idx, seasonal_coef);
1187 }
1188 } /* end RRA Loop */
1189 if (rrd_test_error()) break;
1190 } /* end elapsed_pdp_st loop */
1192 if (rrd_test_error()) break;
1194 /* Ready to write to disk */
1195 /* Move sequentially through the file, writing one RRA at a time.
1196 * Note this architecture divorces the computation of CDP with
1197 * flushing updated RRA entries to disk. */
1198 for(i = 0, rra_start = rra_begin;
1199 i < rrd.stat_head->rra_cnt;
1200 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1201 i++) {
1202 /* is there anything to write for this RRA? If not, continue. */
1203 if (rra_step_cnt[i] == 0) continue;
1205 /* write the first row */
1206 #ifdef DEBUG
1207 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1208 #endif
1209 rrd.rra_ptr[i].cur_row++;
1210 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1211 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1212 /* positition on the first row */
1213 rra_pos_tmp = rra_start +
1214 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1215 if(rra_pos_tmp != rra_current) {
1216 #ifndef HAVE_MMAP
1217 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1218 rrd_set_error("seek error in rrd");
1219 break;
1220 }
1221 #endif
1222 rra_current = rra_pos_tmp;
1223 }
1225 #ifdef DEBUG
1226 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1227 #endif
1228 scratch_idx = CDP_primary_val;
1229 if (pcdp_summary != NULL)
1230 {
1231 rra_time = (current_time - current_time
1232 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1233 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1234 }
1235 #ifdef HAVE_MMAP
1236 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1237 pcdp_summary, &rra_time, rrd_mmaped_file);
1238 #else
1239 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1240 pcdp_summary, &rra_time);
1241 #endif
1242 if (rrd_test_error()) break;
1244 /* write other rows of the bulk update, if any */
1245 scratch_idx = CDP_secondary_val;
1246 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1247 {
1248 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1249 {
1250 #ifdef DEBUG
1251 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1252 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1253 #endif
1254 /* wrap */
1255 rrd.rra_ptr[i].cur_row = 0;
1256 /* seek back to beginning of current rra */
1257 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1258 {
1259 rrd_set_error("seek error in rrd");
1260 break;
1261 }
1262 #ifdef DEBUG
1263 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1264 #endif
1265 rra_current = rra_start;
1266 }
1267 if (pcdp_summary != NULL)
1268 {
1269 rra_time = (current_time - current_time
1270 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1271 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1272 }
1273 #ifdef HAVE_MMAP
1274 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1275 pcdp_summary, &rra_time, rrd_mmaped_file);
1276 #else
1277 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1278 pcdp_summary, &rra_time);
1279 #endif
1280 }
1282 if (rrd_test_error())
1283 break;
1284 } /* RRA LOOP */
1286 /* break out of the argument parsing loop if error_string is set */
1287 if (rrd_test_error()){
1288 free(step_start);
1289 break;
1290 }
1292 } /* endif a pdp_st has occurred */
1293 rrd.live_head->last_up = current_time;
1294 rrd.live_head->last_up_usec = current_time_usec;
1295 free(step_start);
1296 } /* function argument loop */
1298 if (seasonal_coef != NULL) free(seasonal_coef);
1299 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1300 if (rra_step_cnt != NULL) free(rra_step_cnt);
1301 rpnstack_free(&rpnstack);
1303 #ifdef HAVE_MMAP
1304 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1305 rrd_set_error("error writing(unmapping) file: %s", filename);
1306 }
1307 #endif
1308 /* if we got here and if there is an error and if the file has not been
1309 * written to, then close things up and return. */
1310 if (rrd_test_error()) {
1311 free(updvals);
1312 free(tmpl_idx);
1313 rrd_free(&rrd);
1314 free(pdp_temp);
1315 free(pdp_new);
1316 fclose(rrd_file);
1317 return(-1);
1318 }
1320 /* aargh ... that was tough ... so many loops ... anyway, its done.
1321 * we just need to write back the live header portion now*/
1323 if (fseek(rrd_file, (sizeof(stat_head_t)
1324 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1325 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1326 SEEK_SET) != 0) {
1327 rrd_set_error("seek rrd for live header writeback");
1328 free(updvals);
1329 free(tmpl_idx);
1330 rrd_free(&rrd);
1331 free(pdp_temp);
1332 free(pdp_new);
1333 fclose(rrd_file);
1334 return(-1);
1335 }
1337 if(version >= 3) {
1338 if(fwrite( rrd.live_head,
1339 sizeof(live_head_t), 1, rrd_file) != 1){
1340 rrd_set_error("fwrite live_head to rrd");
1341 free(updvals);
1342 rrd_free(&rrd);
1343 free(tmpl_idx);
1344 free(pdp_temp);
1345 free(pdp_new);
1346 fclose(rrd_file);
1347 return(-1);
1348 }
1349 }
1350 else {
1351 if(fwrite( &rrd.live_head->last_up,
1352 sizeof(time_t), 1, rrd_file) != 1){
1353 rrd_set_error("fwrite live_head to rrd");
1354 free(updvals);
1355 rrd_free(&rrd);
1356 free(tmpl_idx);
1357 free(pdp_temp);
1358 free(pdp_new);
1359 fclose(rrd_file);
1360 return(-1);
1361 }
1362 }
1365 if(fwrite( rrd.pdp_prep,
1366 sizeof(pdp_prep_t),
1367 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1368 rrd_set_error("ftwrite pdp_prep to rrd");
1369 free(updvals);
1370 rrd_free(&rrd);
1371 free(tmpl_idx);
1372 free(pdp_temp);
1373 free(pdp_new);
1374 fclose(rrd_file);
1375 return(-1);
1376 }
1378 if(fwrite( rrd.cdp_prep,
1379 sizeof(cdp_prep_t),
1380 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1381 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1383 rrd_set_error("ftwrite cdp_prep to rrd");
1384 free(updvals);
1385 free(tmpl_idx);
1386 rrd_free(&rrd);
1387 free(pdp_temp);
1388 free(pdp_new);
1389 fclose(rrd_file);
1390 return(-1);
1391 }
1393 if(fwrite( rrd.rra_ptr,
1394 sizeof(rra_ptr_t),
1395 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1396 rrd_set_error("fwrite rra_ptr to rrd");
1397 free(updvals);
1398 free(tmpl_idx);
1399 rrd_free(&rrd);
1400 free(pdp_temp);
1401 free(pdp_new);
1402 fclose(rrd_file);
1403 return(-1);
1404 }
1406 /* OK now close the files and free the memory */
1407 if(fclose(rrd_file) != 0){
1408 rrd_set_error("closing rrd");
1409 free(updvals);
1410 free(tmpl_idx);
1411 rrd_free(&rrd);
1412 free(pdp_temp);
1413 free(pdp_new);
1414 return(-1);
1415 }
1417 /* calling the smoothing code here guarantees at most
1418 * one smoothing operation per rrd_update call. Unfortunately,
1419 * it is possible with bulk updates, or a long-delayed update
1420 * for smoothing to occur off-schedule. This really isn't
1421 * critical except during the burning cycles. */
1422 if (schedule_smooth)
1423 {
1424 rrd_file = fopen(filename,"rb+");
1425 rra_start = rra_begin;
1426 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1427 {
1428 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1429 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1430 {
1431 #ifdef DEBUG
1432 fprintf(stderr,"Running smoother for rra %ld\n",i);
1433 #endif
1434 apply_smoother(&rrd,i,rra_start,rrd_file);
1435 if (rrd_test_error())
1436 break;
1437 }
1438 rra_start += rrd.rra_def[i].row_cnt
1439 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1440 }
1441 fclose(rrd_file);
1442 }
1443 rrd_free(&rrd);
1444 free(updvals);
1445 free(tmpl_idx);
1446 free(pdp_new);
1447 free(pdp_temp);
1448 return(0);
1449 }
1451 /*
1452 * get exclusive lock to whole file.
1453 * lock gets removed when we close the file
1454 *
1455 * returns 0 on success
1456 */
1457 int
1458 LockRRD(FILE *rrdfile)
1459 {
1460 int rrd_fd; /* File descriptor for RRD */
1461 int rcstat;
1463 rrd_fd = fileno(rrdfile);
1465 {
1466 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1467 struct _stat st;
1469 if ( _fstat( rrd_fd, &st ) == 0 ) {
1470 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1471 } else {
1472 rcstat = -1;
1473 }
1474 #else
1475 struct flock lock;
1476 lock.l_type = F_WRLCK; /* exclusive write lock */
1477 lock.l_len = 0; /* whole file */
1478 lock.l_start = 0; /* start of file */
1479 lock.l_whence = SEEK_SET; /* end of file */
1481 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1482 #endif
1483 }
1485 return(rcstat);
1486 }
1489 #ifdef HAVE_MMAP
1490 info_t
1491 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1492 unsigned short CDP_scratch_idx,
1493 #ifndef DEBUG
1494 FILE UNUSED(*rrd_file),
1495 #else
1496 FILE *rrd_file,
1497 #endif
1498 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1499 #else
1500 info_t
1501 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1502 unsigned short CDP_scratch_idx, FILE *rrd_file,
1503 info_t *pcdp_summary, time_t *rra_time)
1504 #endif
1505 {
1506 unsigned long ds_idx, cdp_idx;
1507 infoval iv;
1509 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1510 {
1511 /* compute the cdp index */
1512 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1513 #ifdef DEBUG
1514 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1515 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1516 rrd -> rra_def[rra_idx].cf_nam);
1517 #endif
1518 if (pcdp_summary != NULL)
1519 {
1520 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1521 /* append info to the return hash */
1522 pcdp_summary = info_push(pcdp_summary,
1523 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1524 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1525 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1526 RD_I_VAL, iv);
1527 }
1528 #ifdef HAVE_MMAP
1529 memcpy((char *)rrd_mmaped_file + *rra_current,
1530 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1531 sizeof(rrd_value_t));
1532 #else
1533 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1534 sizeof(rrd_value_t),1,rrd_file) != 1)
1535 {
1536 rrd_set_error("writing rrd");
1537 return 0;
1538 }
1539 #endif
1540 *rra_current += sizeof(rrd_value_t);
1541 }
1542 return (pcdp_summary);
1543 }