1 /*****************************************************************************
2 * RRDtool 1.2.12 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normilize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static void normalize_time(struct timeval *t)
65 {
66 if(t->tv_usec < 0) {
67 t->tv_sec--;
68 t->tv_usec += 1000000L;
69 }
70 }
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv,
92 info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 #ifdef STANDALONE
98 int
99 main(int argc, char **argv){
100 rrd_update(argc,argv);
101 if (rrd_test_error()) {
102 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
103 "Usage: rrdupdate filename\n"
104 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105 "\t\t\ttime|N:value[:value...]\n\n"
106 "\t\t\tat-time@value[:value...]\n\n"
107 "\t\t\t[ time:value[:value...] ..]\n\n");
109 printf("ERROR: %s\n",rrd_get_error());
110 rrd_clear_error();
111 return 1;
112 }
113 return 0;
114 }
115 #endif
117 info_t *rrd_update_v(int argc, char **argv)
118 {
119 char *tmplt = NULL;
120 info_t *result = NULL;
121 infoval rc;
122 optind = 0; opterr = 0; /* initialize getopt */
124 while (1) {
125 static struct option long_options[] =
126 {
127 {"template", required_argument, 0, 't'},
128 {0,0,0,0}
129 };
130 int option_index = 0;
131 int opt;
132 opt = getopt_long(argc, argv, "t:",
133 long_options, &option_index);
135 if (opt == EOF)
136 break;
138 switch(opt) {
139 case 't':
140 tmplt = optarg;
141 break;
143 case '?':
144 rrd_set_error("unknown option '%s'",argv[optind-1]);
145 rc.u_int = -1;
146 goto end_tag;
147 }
148 }
150 /* need at least 2 arguments: filename, data. */
151 if (argc-optind < 2) {
152 rrd_set_error("Not enough arguments");
153 rc.u_int = -1;
154 goto end_tag;
155 }
156 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157 rc.u_int = _rrd_update(argv[optind], tmplt,
158 argc - optind - 1, argv + optind + 1, result);
159 result->value.u_int = rc.u_int;
160 end_tag:
161 return result;
162 }
164 int
165 rrd_update(int argc, char **argv)
166 {
167 char *tmplt = NULL;
168 int rc;
169 optind = 0; opterr = 0; /* initialize getopt */
171 while (1) {
172 static struct option long_options[] =
173 {
174 {"template", required_argument, 0, 't'},
175 {0,0,0,0}
176 };
177 int option_index = 0;
178 int opt;
179 opt = getopt_long(argc, argv, "t:",
180 long_options, &option_index);
182 if (opt == EOF)
183 break;
185 switch(opt) {
186 case 't':
187 tmplt = optarg;
188 break;
190 case '?':
191 rrd_set_error("unknown option '%s'",argv[optind-1]);
192 return(-1);
193 }
194 }
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
200 return -1;
201 }
203 rc = rrd_update_r(argv[optind], tmplt,
204 argc - optind - 1, argv + optind + 1);
205 return rc;
206 }
208 int
209 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
210 {
211 return _rrd_update(filename, tmplt, argc, argv, NULL);
212 }
214 int
215 _rrd_update(char *filename, char *tmplt, int argc, char **argv,
216 info_t *pcdp_summary)
217 {
219 int arg_i = 2;
220 short j;
221 unsigned long i,ii,iii=1;
223 unsigned long rra_begin; /* byte pointer to the rra
224 * area in the rrd file. this
225 * pointer never changes value */
226 unsigned long rra_start; /* byte pointer to the rra
227 * area in the rrd file. this
228 * pointer changes as each rrd is
229 * processed. */
230 unsigned long rra_current; /* byte pointer to the current write
231 * spot in the rrd file. */
232 unsigned long rra_pos_tmp; /* temporary byte pointer. */
233 double interval,
234 pre_int,post_int; /* interval between this and
235 * the last run */
236 unsigned long proc_pdp_st; /* which pdp_st was the last
237 * to be processed */
238 unsigned long occu_pdp_st; /* when was the pdp_st
239 * before the last update
240 * time */
241 unsigned long proc_pdp_age; /* how old was the data in
242 * the pdp prep area when it
243 * was last updated */
244 unsigned long occu_pdp_age; /* how long ago was the last
245 * pdp_step time */
246 rrd_value_t *pdp_new; /* prepare the incoming data
247 * to be added the the
248 * existing entry */
249 rrd_value_t *pdp_temp; /* prepare the pdp values
250 * to be added the the
251 * cdp values */
253 long *tmpl_idx; /* index representing the settings
254 transported by the tmplt index */
255 unsigned long tmpl_cnt = 2; /* time and data */
257 FILE *rrd_file;
258 rrd_t rrd;
259 time_t current_time = 0;
260 time_t rra_time = 0; /* time of update for a RRA */
261 unsigned long current_time_usec=0;/* microseconds part of current time */
262 struct timeval tmp_time; /* used for time conversion */
264 char **updvals;
265 int schedule_smooth = 0;
266 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267 /* a vector of future Holt-Winters seasonal coefs */
268 unsigned long elapsed_pdp_st;
269 /* number of elapsed PDP steps since last update */
270 unsigned long *rra_step_cnt = NULL;
271 /* number of rows to be updated in an RRA for a data
272 * value. */
273 unsigned long start_pdp_offset;
274 /* number of PDP steps since the last update that
275 * are assigned to the first CDP to be generated
276 * since the last update. */
277 unsigned short scratch_idx;
278 /* index into the CDP scratch array */
279 enum cf_en current_cf;
280 /* numeric id of the current consolidation function */
281 rpnstack_t rpnstack; /* used for COMPUTE DS */
282 int version; /* rrd version */
283 char *endptr; /* used in the conversion */
284 #ifdef HAVE_MMAP
285 void *rrd_mmaped_file;
286 unsigned long rrd_filesize;
287 #endif
289 rpnstack_init(&rpnstack);
291 /* need at least 1 arguments: data. */
292 if (argc < 1) {
293 rrd_set_error("Not enough arguments");
294 return -1;
295 }
299 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
300 return -1;
301 }
302 /* initialize time */
303 version = atoi(rrd.stat_head->version);
304 gettimeofday(&tmp_time, 0);
305 normalize_time(&tmp_time);
306 current_time = tmp_time.tv_sec;
307 if(version >= 3) {
308 current_time_usec = tmp_time.tv_usec;
309 }
310 else {
311 current_time_usec = 0;
312 }
314 rra_current = rra_start = rra_begin = ftell(rrd_file);
315 /* This is defined in the ANSI C standard, section 7.9.5.3:
317 When a file is opened with udpate mode ('+' as the second
318 or third character in the ... list of mode argument
319 variables), both input and ouptut may be performed on the
320 associated stream. However, ... input may not be directly
321 followed by output without an intervening call to a file
322 positioning function, unless the input oepration encounters
323 end-of-file. */
324 #ifdef HAVE_MMAP
325 fseek(rrd_file, 0, SEEK_END);
326 rrd_filesize = ftell(rrd_file);
327 fseek(rrd_file, rra_current, SEEK_SET);
328 #else
329 fseek(rrd_file, 0, SEEK_CUR);
330 #endif
333 /* get exclusive lock to whole file.
334 * lock gets removed when we close the file.
335 */
336 if (LockRRD(rrd_file) != 0) {
337 rrd_set_error("could not lock RRD");
338 rrd_free(&rrd);
339 fclose(rrd_file);
340 return(-1);
341 }
343 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating updvals pointer array");
345 rrd_free(&rrd);
346 fclose(rrd_file);
347 return(-1);
348 }
350 if ((pdp_temp = malloc(sizeof(rrd_value_t)
351 *rrd.stat_head->ds_cnt))==NULL){
352 rrd_set_error("allocating pdp_temp ...");
353 free(updvals);
354 rrd_free(&rrd);
355 fclose(rrd_file);
356 return(-1);
357 }
359 if ((tmpl_idx = malloc(sizeof(unsigned long)
360 *(rrd.stat_head->ds_cnt+1)))==NULL){
361 rrd_set_error("allocating tmpl_idx ...");
362 free(pdp_temp);
363 free(updvals);
364 rrd_free(&rrd);
365 fclose(rrd_file);
366 return(-1);
367 }
368 /* initialize tmplt redirector */
369 /* default config example (assume DS 1 is a CDEF DS)
370 tmpl_idx[0] -> 0; (time)
371 tmpl_idx[1] -> 1; (DS 0)
372 tmpl_idx[2] -> 3; (DS 2)
373 tmpl_idx[3] -> 4; (DS 3) */
374 tmpl_idx[0] = 0; /* time */
375 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
376 {
377 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
378 tmpl_idx[ii++]=i;
379 }
380 tmpl_cnt= ii;
382 if (tmplt) {
383 /* we should work on a writeable copy here */
384 char *dsname;
385 unsigned int tmpl_len;
386 tmplt = strdup(tmplt);
387 dsname = tmplt;
388 tmpl_cnt = 1; /* the first entry is the time */
389 tmpl_len = strlen(tmplt);
390 for(i=0;i<=tmpl_len ;i++) {
391 if (tmplt[i] == ':' || tmplt[i] == '\0') {
392 tmplt[i] = '\0';
393 if (tmpl_cnt>rrd.stat_head->ds_cnt){
394 rrd_set_error("tmplt contains more DS definitions than RRD");
395 free(updvals); free(pdp_temp);
396 free(tmpl_idx); rrd_free(&rrd);
397 fclose(rrd_file); return(-1);
398 }
399 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
400 rrd_set_error("unknown DS name '%s'",dsname);
401 free(updvals); free(pdp_temp);
402 free(tmplt);
403 free(tmpl_idx); rrd_free(&rrd);
404 fclose(rrd_file); return(-1);
405 } else {
406 /* the first element is always the time */
407 tmpl_idx[tmpl_cnt-1]++;
408 /* go to the next entry on the tmplt */
409 dsname = &tmplt[i+1];
410 /* fix the damage we did before */
411 if (i<tmpl_len) {
412 tmplt[i]=':';
413 }
415 }
416 }
417 }
418 free(tmplt);
419 }
420 if ((pdp_new = malloc(sizeof(rrd_value_t)
421 *rrd.stat_head->ds_cnt))==NULL){
422 rrd_set_error("allocating pdp_new ...");
423 free(updvals);
424 free(pdp_temp);
425 free(tmpl_idx);
426 rrd_free(&rrd);
427 fclose(rrd_file);
428 return(-1);
429 }
431 #ifdef HAVE_MMAP
432 rrd_mmaped_file = mmap(0,
433 rrd_filesize,
434 PROT_READ | PROT_WRITE,
435 MAP_SHARED,
436 fileno(rrd_file),
437 0);
438 if (rrd_mmaped_file == MAP_FAILED) {
439 rrd_set_error("error mmapping file %s", filename);
440 free(updvals);
441 free(pdp_temp);
442 free(tmpl_idx);
443 rrd_free(&rrd);
444 fclose(rrd_file);
445 return(-1);
446 }
447 #endif
448 /* loop through the arguments. */
449 for(arg_i=0; arg_i<argc;arg_i++) {
450 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
451 char *step_start = stepper;
452 char *p;
453 char *parsetime_error = NULL;
454 enum {atstyle, normal} timesyntax;
455 struct rrd_time_value ds_tv;
456 if (stepper == NULL){
457 rrd_set_error("failed duplication argv entry");
458 free(updvals);
459 free(pdp_temp);
460 free(tmpl_idx);
461 rrd_free(&rrd);
462 #ifdef HAVE_MMAP
463 munmap(rrd_mmaped_file, rrd_filesize);
464 #endif
465 fclose(rrd_file);
466 return(-1);
467 }
468 /* initialize all ds input to unknown except the first one
469 which has always got to be set */
470 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471 strcpy(stepper,argv[arg_i]);
472 updvals[0]=stepper;
473 /* separate all ds elements; first must be examined separately
474 due to alternate time syntax */
475 if ((p=strchr(stepper,'@'))!=NULL) {
476 timesyntax = atstyle;
477 *p = '\0';
478 stepper = p+1;
479 } else if ((p=strchr(stepper,':'))!=NULL) {
480 timesyntax = normal;
481 *p = '\0';
482 stepper = p+1;
483 } else {
484 rrd_set_error("expected timestamp not found in data source from %s:...",
485 argv[arg_i]);
486 free(step_start);
487 break;
488 }
489 ii=1;
490 updvals[tmpl_idx[ii]] = stepper;
491 while (*stepper) {
492 if (*stepper == ':') {
493 *stepper = '\0';
494 ii++;
495 if (ii<tmpl_cnt){
496 updvals[tmpl_idx[ii]] = stepper+1;
497 }
498 }
499 stepper++;
500 }
502 if (ii != tmpl_cnt-1) {
503 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504 tmpl_cnt-1, ii, argv[arg_i]);
505 free(step_start);
506 break;
507 }
509 /* get the time from the reading ... handle N */
510 if (timesyntax == atstyle) {
511 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
513 free(step_start);
514 break;
515 }
516 if (ds_tv.type == RELATIVE_TO_END_TIME ||
517 ds_tv.type == RELATIVE_TO_START_TIME) {
518 rrd_set_error("specifying time relative to the 'start' "
519 "or 'end' makes no sense here: %s",
520 updvals[0]);
521 free(step_start);
522 break;
523 }
525 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
528 } else if (strcmp(updvals[0],"N")==0){
529 gettimeofday(&tmp_time, 0);
530 normalize_time(&tmp_time);
531 current_time = tmp_time.tv_sec;
532 current_time_usec = tmp_time.tv_usec;
533 } else {
534 double tmp;
535 tmp = strtod(updvals[0], 0);
536 current_time = floor(tmp);
537 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
538 }
539 /* dont do any correction for old version RRDs */
540 if(version < 3)
541 current_time_usec = 0;
543 if(current_time < rrd.live_head->last_up ||
544 (current_time == rrd.live_head->last_up &&
545 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
546 rrd_set_error("illegal attempt to update using time %ld when "
547 "last update time is %ld (minimum one second step)",
548 current_time, rrd.live_head->last_up);
549 free(step_start);
550 break;
551 }
554 /* seek to the beginning of the rra's */
555 if (rra_current != rra_begin) {
556 #ifndef HAVE_MMAP
557 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
558 rrd_set_error("seek error in rrd");
559 free(step_start);
560 break;
561 }
562 #endif
563 rra_current = rra_begin;
564 }
565 rra_start = rra_begin;
567 /* when was the current pdp started */
568 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
571 /* when did the last pdp_st occur */
572 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573 occu_pdp_st = current_time - occu_pdp_age;
575 /* interval = current_time - rrd.live_head->last_up; */
576 interval = (double)(current_time - rrd.live_head->last_up)
577 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
579 if (occu_pdp_st > proc_pdp_st){
580 /* OK we passed the pdp_st moment*/
581 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
582 * occurred before the latest
583 * pdp_st moment*/
584 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
585 post_int = occu_pdp_age; /* how much after it */
586 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
587 } else {
588 pre_int = interval;
589 post_int = 0;
590 }
592 #ifdef DEBUG
593 printf(
594 "proc_pdp_age %lu\t"
595 "proc_pdp_st %lu\t"
596 "occu_pfp_age %lu\t"
597 "occu_pdp_st %lu\t"
598 "int %lf\t"
599 "pre_int %lf\t"
600 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
601 occu_pdp_age, occu_pdp_st,
602 interval, pre_int, post_int);
603 #endif
605 /* process the data sources and update the pdp_prep
606 * area accordingly */
607 for(i=0;i<rrd.stat_head->ds_cnt;i++){
608 enum dst_en dst_idx;
609 dst_idx= dst_conv(rrd.ds_def[i].dst);
611 /* make sure we do not build diffs with old last_ds values */
612 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
613 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
614 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
615 }
617 /* NOTE: DST_CDEF should never enter this if block, because
618 * updvals[i+1][0] is initialized to 'U'; unless the caller
619 * accidently specified a value for the DST_CDEF. To handle
620 * this case, an extra check is required. */
622 if((updvals[i+1][0] != 'U') &&
623 (dst_idx != DST_CDEF) &&
624 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
625 double rate = DNAN;
626 /* the data source type defines how to process the data */
627 /* pdp_new contains rate * time ... eg the bytes
628 * transferred during the interval. Doing it this way saves
629 * a lot of math operations */
632 switch(dst_idx){
633 case DST_COUNTER:
634 case DST_DERIVE:
635 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636 for(ii=0;updvals[i+1][ii] != '\0';ii++){
637 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
639 break;
640 }
641 }
642 if (rrd_test_error()){
643 break;
644 }
645 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646 if(dst_idx == DST_COUNTER) {
647 /* simple overflow catcher suggested by Andres Kroonmaa */
648 /* this will fail terribly for non 32 or 64 bit counters ... */
649 /* are there any others in SNMP land ? */
650 if (pdp_new[i] < (double)0.0 )
651 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
652 if (pdp_new[i] < (double)0.0 )
653 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
654 }
655 rate = pdp_new[i] / interval;
656 }
657 else {
658 pdp_new[i]= DNAN;
659 }
660 break;
661 case DST_ABSOLUTE:
662 errno = 0;
663 pdp_new[i] = strtod(updvals[i+1],&endptr);
664 if (errno > 0){
665 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
666 break;
667 };
668 if (endptr[0] != '\0'){
669 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
670 break;
671 }
672 rate = pdp_new[i] / interval;
673 break;
674 case DST_GAUGE:
675 errno = 0;
676 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
677 if (errno > 0){
678 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
679 break;
680 };
681 if (endptr[0] != '\0'){
682 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
683 break;
684 }
685 rate = pdp_new[i] / interval;
686 break;
687 default:
688 rrd_set_error("rrd contains unknown DS type : '%s'",
689 rrd.ds_def[i].dst);
690 break;
691 }
692 /* break out of this for loop if the error string is set */
693 if (rrd_test_error()){
694 break;
695 }
696 /* make sure pdp_temp is neither too large or too small
697 * if any of these occur it becomes unknown ...
698 * sorry folks ... */
699 if ( ! isnan(rate) &&
700 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
702 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
704 pdp_new[i] = DNAN;
705 }
706 } else {
707 /* no news is news all the same */
708 pdp_new[i] = DNAN;
709 }
712 /* make a copy of the command line argument for the next run */
713 #ifdef DEBUG
714 fprintf(stderr,
715 "prep ds[%lu]\t"
716 "last_arg '%s'\t"
717 "this_arg '%s'\t"
718 "pdp_new %10.2f\n",
719 i,
720 rrd.pdp_prep[i].last_ds,
721 updvals[i+1], pdp_new[i]);
722 #endif
723 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
724 strncpy(rrd.pdp_prep[i].last_ds,
725 updvals[i+1],LAST_DS_LEN-1);
726 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
727 }
728 }
729 /* break out of the argument parsing loop if the error_string is set */
730 if (rrd_test_error()){
731 free(step_start);
732 break;
733 }
734 /* has a pdp_st moment occurred since the last run ? */
736 if (proc_pdp_st == occu_pdp_st){
737 /* no we have not passed a pdp_st moment. therefore update is simple */
739 for(i=0;i<rrd.stat_head->ds_cnt;i++){
740 if(isnan(pdp_new[i])) {
741 /* this is not realy accurate if we use subsecond data arival time
742 should have thought of it when going subsecond resolution ...
743 sorry next format change we will have it! */
744 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
745 } else {
746 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
747 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
748 } else {
749 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
750 }
751 }
752 #ifdef DEBUG
753 fprintf(stderr,
754 "NO PDP ds[%lu]\t"
755 "value %10.2f\t"
756 "unkn_sec %5lu\n",
757 i,
758 rrd.pdp_prep[i].scratch[PDP_val].u_val,
759 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
760 #endif
761 }
762 } else {
763 /* an pdp_st has occurred. */
765 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
766 * occurred up to the last run.
767 pdp_new[] contains rate*seconds from the latest run.
768 pdp_temp[] will contain the rate for cdp */
770 for(i=0;i<rrd.stat_head->ds_cnt;i++){
771 /* update pdp_prep to the current pdp_st. */
772 double pre_unknown = 0.0;
773 if(isnan(pdp_new[i]))
774 /* a final bit of unkonwn to be added bevore calculation
775 * we use a tempaorary variable for this so that we
776 * don't have to turn integer lines before using the value */
777 pre_unknown = pre_int;
778 else {
779 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
780 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
781 } else {
782 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
783 }
784 }
787 /* if too much of the pdp_prep is unknown we dump it */
788 if (
789 /* removed because this does not agree with the definition
790 a heart beat can be unknown */
791 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
792 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
793 /* if the interval is larger thatn mrhb we get NAN */
794 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
795 (occu_pdp_st-proc_pdp_st <=
796 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
797 pdp_temp[i] = DNAN;
798 } else {
799 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
800 / ((double)(occu_pdp_st - proc_pdp_st
801 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
802 -pre_unknown);
803 }
805 /* process CDEF data sources; remember each CDEF DS can
806 * only reference other DS with a lower index number */
807 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
808 rpnp_t *rpnp;
809 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
810 /* substitue data values for OP_VARIABLE nodes */
811 for (ii = 0; rpnp[ii].op != OP_END; ii++)
812 {
813 if (rpnp[ii].op == OP_VARIABLE) {
814 rpnp[ii].op = OP_NUMBER;
815 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
816 }
817 }
818 /* run the rpn calculator */
819 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
820 free(rpnp);
821 break; /* exits the data sources pdp_temp loop */
822 }
823 }
825 /* make pdp_prep ready for the next run */
826 if(isnan(pdp_new[i])){
827 /* this is not realy accurate if we use subsecond data arival time
828 should have thought of it when going subsecond resolution ...
829 sorry next format change we will have it! */
830 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
831 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
832 } else {
833 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
834 rrd.pdp_prep[i].scratch[PDP_val].u_val =
835 pdp_new[i]/interval*post_int;
836 }
838 #ifdef DEBUG
839 fprintf(stderr,
840 "PDP UPD ds[%lu]\t"
841 "pdp_temp %10.2f\t"
842 "new_prep %10.2f\t"
843 "new_unkn_sec %5lu\n",
844 i, pdp_temp[i],
845 rrd.pdp_prep[i].scratch[PDP_val].u_val,
846 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
847 #endif
848 }
850 /* if there were errors during the last loop, bail out here */
851 if (rrd_test_error()){
852 free(step_start);
853 break;
854 }
856 /* compute the number of elapsed pdp_st moments */
857 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
858 #ifdef DEBUG
859 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
860 #endif
861 if (rra_step_cnt == NULL)
862 {
863 rra_step_cnt = (unsigned long *)
864 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
865 }
867 for(i = 0, rra_start = rra_begin;
868 i < rrd.stat_head->rra_cnt;
869 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
870 i++)
871 {
872 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
873 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
874 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
875 if (start_pdp_offset <= elapsed_pdp_st) {
876 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
877 rrd.rra_def[i].pdp_cnt + 1;
878 } else {
879 rra_step_cnt[i] = 0;
880 }
882 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
883 {
884 /* If this is a bulk update, we need to skip ahead in the seasonal
885 * arrays so that they will be correct for the next observed value;
886 * note that for the bulk update itself, no update will occur to
887 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
888 * be set to DNAN. */
889 if (rra_step_cnt[i] > 2)
890 {
891 /* skip update by resetting rra_step_cnt[i],
892 * note that this is not data source specific; this is due
893 * to the bulk update, not a DNAN value for the specific data
894 * source. */
895 rra_step_cnt[i] = 0;
896 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
897 &last_seasonal_coef);
898 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
899 &seasonal_coef);
900 }
902 /* periodically run a smoother for seasonal effects */
903 /* Need to use first cdp parameter buffer to track
904 * burnin (burnin requires a specific smoothing schedule).
905 * The CDP_init_seasonal parameter is really an RRA level,
906 * not a data source within RRA level parameter, but the rra_def
907 * is read only for rrd_update (not flushed to disk). */
908 iii = i*(rrd.stat_head -> ds_cnt);
909 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
910 <= BURNIN_CYCLES)
911 {
912 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
913 > rrd.rra_def[i].row_cnt - 1) {
914 /* mark off one of the burnin cycles */
915 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
916 schedule_smooth = 1;
917 }
918 } else {
919 /* someone has no doubt invented a trick to deal with this
920 * wrap around, but at least this code is clear. */
921 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
922 rrd.rra_ptr[i].cur_row)
923 {
924 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
925 * mapping between PDP and CDP */
926 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
927 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
928 {
929 #ifdef DEBUG
930 fprintf(stderr,
931 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
932 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
933 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
934 #endif
935 schedule_smooth = 1;
936 }
937 } else {
938 /* can't rely on negative numbers because we are working with
939 * unsigned values */
940 /* Don't need modulus here. If we've wrapped more than once, only
941 * one smooth is executed at the end. */
942 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
943 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
944 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
945 {
946 #ifdef DEBUG
947 fprintf(stderr,
948 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
949 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
950 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
951 #endif
952 schedule_smooth = 1;
953 }
954 }
955 }
957 rra_current = ftell(rrd_file);
958 } /* if cf is DEVSEASONAL or SEASONAL */
960 if (rrd_test_error()) break;
962 /* update CDP_PREP areas */
963 /* loop over data soures within each RRA */
964 for(ii = 0;
965 ii < rrd.stat_head->ds_cnt;
966 ii++)
967 {
969 /* iii indexes the CDP prep area for this data source within the RRA */
970 iii=i*rrd.stat_head->ds_cnt+ii;
972 if (rrd.rra_def[i].pdp_cnt > 1) {
974 if (rra_step_cnt[i] > 0) {
975 /* If we are in this block, as least 1 CDP value will be written to
976 * disk, this is the CDP_primary_val entry. If more than 1 value needs
977 * to be written, then the "fill in" value is the CDP_secondary_val
978 * entry. */
979 if (isnan(pdp_temp[ii]))
980 {
981 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
982 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
983 } else {
984 /* CDP_secondary value is the RRA "fill in" value for intermediary
985 * CDP data entries. No matter the CF, the value is the same because
986 * the average, max, min, and last of a list of identical values is
987 * the same, namely, the value itself. */
988 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
989 }
991 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
992 > rrd.rra_def[i].pdp_cnt*
993 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
994 {
995 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
996 /* initialize carry over */
997 if (current_cf == CF_AVERAGE) {
998 if (isnan(pdp_temp[ii])) {
999 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1000 } else {
1001 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003 }
1004 } else {
1005 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1006 }
1007 } else {
1008 rrd_value_t cum_val, cur_val;
1009 switch (current_cf) {
1010 case CF_AVERAGE:
1011 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
1012 cur_val = IFDNAN(pdp_temp[ii],0.0);
1013 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1014 (cum_val + cur_val * start_pdp_offset) /
1015 (rrd.rra_def[i].pdp_cnt
1016 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1017 /* initialize carry over value */
1018 if (isnan(pdp_temp[ii])) {
1019 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1020 } else {
1021 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1022 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1023 }
1024 break;
1025 case CF_MAXIMUM:
1026 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1027 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1028 #ifdef DEBUG
1029 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1030 isnan(pdp_temp[ii])) {
1031 fprintf(stderr,
1032 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1033 i,ii);
1034 exit(-1);
1035 }
1036 #endif
1037 if (cur_val > cum_val)
1038 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1039 else
1040 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1041 /* initialize carry over value */
1042 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1043 break;
1044 case CF_MINIMUM:
1045 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1046 cur_val = IFDNAN(pdp_temp[ii],DINF);
1047 #ifdef DEBUG
1048 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1049 isnan(pdp_temp[ii])) {
1050 fprintf(stderr,
1051 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1052 i,ii);
1053 exit(-1);
1054 }
1055 #endif
1056 if (cur_val < cum_val)
1057 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1058 else
1059 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1060 /* initialize carry over value */
1061 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062 break;
1063 case CF_LAST:
1064 default:
1065 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1066 /* initialize carry over value */
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068 break;
1069 }
1070 } /* endif meets xff value requirement for a valid value */
1071 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1072 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1073 if (isnan(pdp_temp[ii]))
1074 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1075 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1076 else
1077 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1078 } else /* rra_step_cnt[i] == 0 */
1079 {
1080 #ifdef DEBUG
1081 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1082 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1083 i,ii);
1084 } else {
1085 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1086 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1087 }
1088 #endif
1089 if (isnan(pdp_temp[ii])) {
1090 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1091 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1092 {
1093 if (current_cf == CF_AVERAGE) {
1094 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1095 elapsed_pdp_st;
1096 } else {
1097 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098 }
1099 #ifdef DEBUG
1100 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1101 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1102 #endif
1103 } else {
1104 switch (current_cf) {
1105 case CF_AVERAGE:
1106 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1107 elapsed_pdp_st;
1108 break;
1109 case CF_MINIMUM:
1110 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1111 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1112 break;
1113 case CF_MAXIMUM:
1114 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1115 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1116 break;
1117 case CF_LAST:
1118 default:
1119 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1120 break;
1121 }
1122 }
1123 }
1124 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1125 if (elapsed_pdp_st > 2)
1126 {
1127 switch (current_cf) {
1128 case CF_AVERAGE:
1129 default:
1130 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1131 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1132 break;
1133 case CF_SEASONAL:
1134 case CF_DEVSEASONAL:
1135 /* need to update cached seasonal values, so they are consistent
1136 * with the bulk update */
1137 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1138 * CDP_last_deviation are the same. */
1139 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1140 last_seasonal_coef[ii];
1141 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1142 seasonal_coef[ii];
1143 break;
1144 case CF_HWPREDICT:
1145 /* need to update the null_count and last_null_count.
1146 * even do this for non-DNAN pdp_temp because the
1147 * algorithm is not learning from batch updates. */
1148 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1149 elapsed_pdp_st;
1150 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1151 elapsed_pdp_st - 1;
1152 /* fall through */
1153 case CF_DEVPREDICT:
1154 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1155 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1156 break;
1157 case CF_FAILURES:
1158 /* do not count missed bulk values as failures */
1159 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1160 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1161 /* need to reset violations buffer.
1162 * could do this more carefully, but for now, just
1163 * assume a bulk update wipes away all violations. */
1164 erase_violations(&rrd, iii, i);
1165 break;
1166 }
1167 }
1168 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1170 if (rrd_test_error()) break;
1172 } /* endif data sources loop */
1173 } /* end RRA Loop */
1175 /* this loop is only entered if elapsed_pdp_st < 3 */
1176 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1177 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1178 {
1179 for(i = 0, rra_start = rra_begin;
1180 i < rrd.stat_head->rra_cnt;
1181 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1182 i++)
1183 {
1184 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1186 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1187 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1188 {
1189 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1190 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1191 &seasonal_coef);
1192 rra_current = ftell(rrd_file);
1193 }
1194 if (rrd_test_error()) break;
1195 /* loop over data soures within each RRA */
1196 for(ii = 0;
1197 ii < rrd.stat_head->ds_cnt;
1198 ii++)
1199 {
1200 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1201 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1202 scratch_idx, seasonal_coef);
1203 }
1204 } /* end RRA Loop */
1205 if (rrd_test_error()) break;
1206 } /* end elapsed_pdp_st loop */
1208 if (rrd_test_error()) break;
1210 /* Ready to write to disk */
1211 /* Move sequentially through the file, writing one RRA at a time.
1212 * Note this architecture divorces the computation of CDP with
1213 * flushing updated RRA entries to disk. */
1214 for(i = 0, rra_start = rra_begin;
1215 i < rrd.stat_head->rra_cnt;
1216 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1217 i++) {
1218 /* is there anything to write for this RRA? If not, continue. */
1219 if (rra_step_cnt[i] == 0) continue;
1221 /* write the first row */
1222 #ifdef DEBUG
1223 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1224 #endif
1225 rrd.rra_ptr[i].cur_row++;
1226 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1227 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1228 /* positition on the first row */
1229 rra_pos_tmp = rra_start +
1230 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1231 if(rra_pos_tmp != rra_current) {
1232 #ifndef HAVE_MMAP
1233 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1234 rrd_set_error("seek error in rrd");
1235 break;
1236 }
1237 #endif
1238 rra_current = rra_pos_tmp;
1239 }
1241 #ifdef DEBUG
1242 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1243 #endif
1244 scratch_idx = CDP_primary_val;
1245 if (pcdp_summary != NULL)
1246 {
1247 rra_time = (current_time - current_time
1248 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1249 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1250 }
1251 #ifdef HAVE_MMAP
1252 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1253 pcdp_summary, &rra_time, rrd_mmaped_file);
1254 #else
1255 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1256 pcdp_summary, &rra_time);
1257 #endif
1258 if (rrd_test_error()) break;
1260 /* write other rows of the bulk update, if any */
1261 scratch_idx = CDP_secondary_val;
1262 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1263 {
1264 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1265 {
1266 #ifdef DEBUG
1267 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1268 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1269 #endif
1270 /* wrap */
1271 rrd.rra_ptr[i].cur_row = 0;
1272 /* seek back to beginning of current rra */
1273 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1274 {
1275 rrd_set_error("seek error in rrd");
1276 break;
1277 }
1278 #ifdef DEBUG
1279 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1280 #endif
1281 rra_current = rra_start;
1282 }
1283 if (pcdp_summary != NULL)
1284 {
1285 rra_time = (current_time - current_time
1286 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1287 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1288 }
1289 #ifdef HAVE_MMAP
1290 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1291 pcdp_summary, &rra_time, rrd_mmaped_file);
1292 #else
1293 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1294 pcdp_summary, &rra_time);
1295 #endif
1296 }
1298 if (rrd_test_error())
1299 break;
1300 } /* RRA LOOP */
1302 /* break out of the argument parsing loop if error_string is set */
1303 if (rrd_test_error()){
1304 free(step_start);
1305 break;
1306 }
1308 } /* endif a pdp_st has occurred */
1309 rrd.live_head->last_up = current_time;
1310 rrd.live_head->last_up_usec = current_time_usec;
1311 free(step_start);
1312 } /* function argument loop */
1314 if (seasonal_coef != NULL) free(seasonal_coef);
1315 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1316 if (rra_step_cnt != NULL) free(rra_step_cnt);
1317 rpnstack_free(&rpnstack);
1319 #ifdef HAVE_MMAP
1320 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1321 rrd_set_error("error writing(unmapping) file: %s", filename);
1322 }
1323 #endif
1324 /* if we got here and if there is an error and if the file has not been
1325 * written to, then close things up and return. */
1326 if (rrd_test_error()) {
1327 free(updvals);
1328 free(tmpl_idx);
1329 rrd_free(&rrd);
1330 free(pdp_temp);
1331 free(pdp_new);
1332 fclose(rrd_file);
1333 return(-1);
1334 }
1336 /* aargh ... that was tough ... so many loops ... anyway, its done.
1337 * we just need to write back the live header portion now*/
1339 if (fseek(rrd_file, (sizeof(stat_head_t)
1340 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1341 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1342 SEEK_SET) != 0) {
1343 rrd_set_error("seek rrd for live header writeback");
1344 free(updvals);
1345 free(tmpl_idx);
1346 rrd_free(&rrd);
1347 free(pdp_temp);
1348 free(pdp_new);
1349 fclose(rrd_file);
1350 return(-1);
1351 }
1353 if(version >= 3) {
1354 if(fwrite( rrd.live_head,
1355 sizeof(live_head_t), 1, rrd_file) != 1){
1356 rrd_set_error("fwrite live_head to rrd");
1357 free(updvals);
1358 rrd_free(&rrd);
1359 free(tmpl_idx);
1360 free(pdp_temp);
1361 free(pdp_new);
1362 fclose(rrd_file);
1363 return(-1);
1364 }
1365 }
1366 else {
1367 if(fwrite( &rrd.live_head->last_up,
1368 sizeof(time_t), 1, rrd_file) != 1){
1369 rrd_set_error("fwrite live_head to rrd");
1370 free(updvals);
1371 rrd_free(&rrd);
1372 free(tmpl_idx);
1373 free(pdp_temp);
1374 free(pdp_new);
1375 fclose(rrd_file);
1376 return(-1);
1377 }
1378 }
1381 if(fwrite( rrd.pdp_prep,
1382 sizeof(pdp_prep_t),
1383 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1384 rrd_set_error("ftwrite pdp_prep to rrd");
1385 free(updvals);
1386 rrd_free(&rrd);
1387 free(tmpl_idx);
1388 free(pdp_temp);
1389 free(pdp_new);
1390 fclose(rrd_file);
1391 return(-1);
1392 }
1394 if(fwrite( rrd.cdp_prep,
1395 sizeof(cdp_prep_t),
1396 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1397 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1399 rrd_set_error("ftwrite cdp_prep to rrd");
1400 free(updvals);
1401 free(tmpl_idx);
1402 rrd_free(&rrd);
1403 free(pdp_temp);
1404 free(pdp_new);
1405 fclose(rrd_file);
1406 return(-1);
1407 }
1409 if(fwrite( rrd.rra_ptr,
1410 sizeof(rra_ptr_t),
1411 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1412 rrd_set_error("fwrite rra_ptr to rrd");
1413 free(updvals);
1414 free(tmpl_idx);
1415 rrd_free(&rrd);
1416 free(pdp_temp);
1417 free(pdp_new);
1418 fclose(rrd_file);
1419 return(-1);
1420 }
1422 /* OK now close the files and free the memory */
1423 if(fclose(rrd_file) != 0){
1424 rrd_set_error("closing rrd");
1425 free(updvals);
1426 free(tmpl_idx);
1427 rrd_free(&rrd);
1428 free(pdp_temp);
1429 free(pdp_new);
1430 return(-1);
1431 }
1433 /* calling the smoothing code here guarantees at most
1434 * one smoothing operation per rrd_update call. Unfortunately,
1435 * it is possible with bulk updates, or a long-delayed update
1436 * for smoothing to occur off-schedule. This really isn't
1437 * critical except during the burning cycles. */
1438 if (schedule_smooth)
1439 {
1440 rrd_file = fopen(filename,"rb+");
1441 rra_start = rra_begin;
1442 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1443 {
1444 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1445 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1446 {
1447 #ifdef DEBUG
1448 fprintf(stderr,"Running smoother for rra %ld\n",i);
1449 #endif
1450 apply_smoother(&rrd,i,rra_start,rrd_file);
1451 if (rrd_test_error())
1452 break;
1453 }
1454 rra_start += rrd.rra_def[i].row_cnt
1455 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1456 }
1457 fclose(rrd_file);
1458 }
1459 rrd_free(&rrd);
1460 free(updvals);
1461 free(tmpl_idx);
1462 free(pdp_new);
1463 free(pdp_temp);
1464 return(0);
1465 }
1467 /*
1468 * get exclusive lock to whole file.
1469 * lock gets removed when we close the file
1470 *
1471 * returns 0 on success
1472 */
1473 int
1474 LockRRD(FILE *rrdfile)
1475 {
1476 int rrd_fd; /* File descriptor for RRD */
1477 int rcstat;
1479 rrd_fd = fileno(rrdfile);
1481 {
1482 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1483 struct _stat st;
1485 if ( _fstat( rrd_fd, &st ) == 0 ) {
1486 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1487 } else {
1488 rcstat = -1;
1489 }
1490 #else
1491 struct flock lock;
1492 lock.l_type = F_WRLCK; /* exclusive write lock */
1493 lock.l_len = 0; /* whole file */
1494 lock.l_start = 0; /* start of file */
1495 lock.l_whence = SEEK_SET; /* end of file */
1497 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1498 #endif
1499 }
1501 return(rcstat);
1502 }
1505 #ifdef HAVE_MMAP
1506 info_t
1507 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1508 unsigned short CDP_scratch_idx,
1509 #ifndef DEBUG
1510 FILE UNUSED(*rrd_file),
1511 #else
1512 FILE *rrd_file,
1513 #endif
1514 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1515 #else
1516 info_t
1517 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1518 unsigned short CDP_scratch_idx, FILE *rrd_file,
1519 info_t *pcdp_summary, time_t *rra_time)
1520 #endif
1521 {
1522 unsigned long ds_idx, cdp_idx;
1523 infoval iv;
1525 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1526 {
1527 /* compute the cdp index */
1528 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1529 #ifdef DEBUG
1530 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1531 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1532 rrd -> rra_def[rra_idx].cf_nam);
1533 #endif
1534 if (pcdp_summary != NULL)
1535 {
1536 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1537 /* append info to the return hash */
1538 pcdp_summary = info_push(pcdp_summary,
1539 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1540 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1541 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1542 RD_I_VAL, iv);
1543 }
1544 #ifdef HAVE_MMAP
1545 memcpy((char *)rrd_mmaped_file + *rra_current,
1546 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1547 sizeof(rrd_value_t));
1548 #else
1549 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1550 sizeof(rrd_value_t),1,rrd_file) != 1)
1551 {
1552 rrd_set_error("writing rrd");
1553 return 0;
1554 }
1555 #endif
1556 *rra_current += sizeof(rrd_value_t);
1557 }
1558 return (pcdp_summary);
1559 }