1 /*****************************************************************************
2 * RRDtool 1.2.0 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
27 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
28 /*
29 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
30 * replacement.
31 */
32 #include <sys/timeb.h>
34 struct timeval {
35 time_t tv_sec; /* seconds */
36 long tv_usec; /* microseconds */
37 };
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static gettimeofday(struct timeval *t, struct __timezone *tz) {
46 struct timeb current_time;
48 _ftime(¤t_time);
50 t->tv_sec = current_time.time;
51 t->tv_usec = current_time.millitm * 1000;
52 }
54 #endif
55 /*
56 * normilize time as returned by gettimeofday. usec part must
57 * be always >= 0
58 */
59 static void normalize_time(struct timeval *t)
60 {
61 if(t->tv_usec < 0) {
62 t->tv_sec--;
63 t->tv_usec += 1000000L;
64 }
65 }
67 /* Local prototypes */
68 int LockRRD(FILE *rrd_file);
69 #ifdef HAVE_MMAP
70 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
71 unsigned long *rra_current,
72 unsigned short CDP_scratch_idx, FILE *rrd_file,
73 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
74 #else
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx, FILE *rrd_file,
78 info_t *pcdp_summary, time_t *rra_time);
79 #endif
80 int rrd_update_r(char *filename, char *template, int argc, char **argv);
81 int _rrd_update(char *filename, char *template, int argc, char **argv,
82 info_t*);
84 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
87 #ifdef STANDALONE
88 int
89 main(int argc, char **argv){
90 rrd_update(argc,argv);
91 if (rrd_test_error()) {
92 printf("RRDtool 1.2.0 Copyright by Tobi Oetiker, 1997-2005\n\n"
93 "Usage: rrdupdate filename\n"
94 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
95 "\t\t\ttime|N:value[:value...]\n\n"
96 "\t\t\tat-time@value[:value...]\n\n"
97 "\t\t\t[ time:value[:value...] ..]\n\n");
99 printf("ERROR: %s\n",rrd_get_error());
100 rrd_clear_error();
101 return 1;
102 }
103 return 0;
104 }
105 #endif
107 info_t *rrd_update_v(int argc, char **argv)
108 {
109 char *template = NULL;
110 info_t *result = NULL;
111 infoval rc;
112 optind = 0; opterr = 0; /* initialize getopt */
114 while (1) {
115 static struct option long_options[] =
116 {
117 {"template", required_argument, 0, 't'},
118 {0,0,0,0}
119 };
120 int option_index = 0;
121 int opt;
122 opt = getopt_long(argc, argv, "t:",
123 long_options, &option_index);
125 if (opt == EOF)
126 break;
128 switch(opt) {
129 case 't':
130 template = optarg;
131 break;
133 case '?':
134 rrd_set_error("unknown option '%s'",argv[optind-1]);
135 rc.u_int = -1;
136 goto end_tag;
137 }
138 }
140 /* need at least 2 arguments: filename, data. */
141 if (argc-optind < 2) {
142 rrd_set_error("Not enough arguments");
143 rc.u_int = -1;
144 goto end_tag;
145 }
146 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
147 rc.u_int = _rrd_update(argv[optind], template,
148 argc - optind - 1, argv + optind + 1, result);
149 result->value.u_int = rc.u_int;
150 end_tag:
151 return result;
152 }
154 int
155 rrd_update(int argc, char **argv)
156 {
157 char *template = NULL;
158 int rc;
159 optind = 0; opterr = 0; /* initialize getopt */
161 while (1) {
162 static struct option long_options[] =
163 {
164 {"template", required_argument, 0, 't'},
165 {0,0,0,0}
166 };
167 int option_index = 0;
168 int opt;
169 opt = getopt_long(argc, argv, "t:",
170 long_options, &option_index);
172 if (opt == EOF)
173 break;
175 switch(opt) {
176 case 't':
177 template = optarg;
178 break;
180 case '?':
181 rrd_set_error("unknown option '%s'",argv[optind-1]);
182 return(-1);
183 }
184 }
186 /* need at least 2 arguments: filename, data. */
187 if (argc-optind < 2) {
188 rrd_set_error("Not enough arguments");
190 return -1;
191 }
193 rc = rrd_update_r(argv[optind], template,
194 argc - optind - 1, argv + optind + 1);
195 return rc;
196 }
198 int
199 rrd_update_r(char *filename, char *template, int argc, char **argv)
200 {
201 return _rrd_update(filename, template, argc, argv, NULL);
202 }
204 int
205 _rrd_update(char *filename, char *template, int argc, char **argv,
206 info_t *pcdp_summary)
207 {
209 int arg_i = 2;
210 short j;
211 unsigned long i,ii,iii=1;
213 unsigned long rra_begin; /* byte pointer to the rra
214 * area in the rrd file. this
215 * pointer never changes value */
216 unsigned long rra_start; /* byte pointer to the rra
217 * area in the rrd file. this
218 * pointer changes as each rrd is
219 * processed. */
220 unsigned long rra_current; /* byte pointer to the current write
221 * spot in the rrd file. */
222 unsigned long rra_pos_tmp; /* temporary byte pointer. */
223 double interval,
224 pre_int,post_int; /* interval between this and
225 * the last run */
226 unsigned long proc_pdp_st; /* which pdp_st was the last
227 * to be processed */
228 unsigned long occu_pdp_st; /* when was the pdp_st
229 * before the last update
230 * time */
231 unsigned long proc_pdp_age; /* how old was the data in
232 * the pdp prep area when it
233 * was last updated */
234 unsigned long occu_pdp_age; /* how long ago was the last
235 * pdp_step time */
236 rrd_value_t *pdp_new; /* prepare the incoming data
237 * to be added the the
238 * existing entry */
239 rrd_value_t *pdp_temp; /* prepare the pdp values
240 * to be added the the
241 * cdp values */
243 long *tmpl_idx; /* index representing the settings
244 transported by the template index */
245 unsigned long tmpl_cnt = 2; /* time and data */
247 FILE *rrd_file;
248 rrd_t rrd;
249 time_t current_time;
250 time_t rra_time; /* time of update for a RRA */
251 unsigned long current_time_usec; /* microseconds part of current time */
252 struct timeval tmp_time; /* used for time conversion */
254 char **updvals;
255 int schedule_smooth = 0;
256 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
257 /* a vector of future Holt-Winters seasonal coefs */
258 unsigned long elapsed_pdp_st;
259 /* number of elapsed PDP steps since last update */
260 unsigned long *rra_step_cnt = NULL;
261 /* number of rows to be updated in an RRA for a data
262 * value. */
263 unsigned long start_pdp_offset;
264 /* number of PDP steps since the last update that
265 * are assigned to the first CDP to be generated
266 * since the last update. */
267 unsigned short scratch_idx;
268 /* index into the CDP scratch array */
269 enum cf_en current_cf;
270 /* numeric id of the current consolidation function */
271 rpnstack_t rpnstack; /* used for COMPUTE DS */
272 int version; /* rrd version */
273 char *endptr; /* used in the conversion */
274 #ifdef HAVE_MMAP
275 void *rrd_mmaped_file;
276 unsigned long rrd_filesize;
277 #endif
279 rpnstack_init(&rpnstack);
281 /* need at least 1 arguments: data. */
282 if (argc < 1) {
283 rrd_set_error("Not enough arguments");
284 return -1;
285 }
289 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
290 return -1;
291 }
292 /* initialize time */
293 version = atoi(rrd.stat_head->version);
294 gettimeofday(&tmp_time, 0);
295 normalize_time(&tmp_time);
296 current_time = tmp_time.tv_sec;
297 if(version >= 3) {
298 current_time_usec = tmp_time.tv_usec;
299 }
300 else {
301 current_time_usec = 0;
302 }
304 rra_current = rra_start = rra_begin = ftell(rrd_file);
305 /* This is defined in the ANSI C standard, section 7.9.5.3:
307 When a file is opened with udpate mode ('+' as the second
308 or third character in the ... list of mode argument
309 variables), both input and ouptut may be performed on the
310 associated stream. However, ... input may not be directly
311 followed by output without an intervening call to a file
312 positioning function, unless the input oepration encounters
313 end-of-file. */
314 #ifdef HAVE_MMAP
315 fseek(rrd_file, 0, SEEK_END);
316 rrd_filesize = ftell(rrd_file);
317 fseek(rrd_file, rra_current, SEEK_SET);
318 #else
319 fseek(rrd_file, 0, SEEK_CUR);
320 #endif
323 /* get exclusive lock to whole file.
324 * lock gets removed when we close the file.
325 */
326 if (LockRRD(rrd_file) != 0) {
327 rrd_set_error("could not lock RRD");
328 rrd_free(&rrd);
329 fclose(rrd_file);
330 return(-1);
331 }
333 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
334 rrd_set_error("allocating updvals pointer array");
335 rrd_free(&rrd);
336 fclose(rrd_file);
337 return(-1);
338 }
340 if ((pdp_temp = malloc(sizeof(rrd_value_t)
341 *rrd.stat_head->ds_cnt))==NULL){
342 rrd_set_error("allocating pdp_temp ...");
343 free(updvals);
344 rrd_free(&rrd);
345 fclose(rrd_file);
346 return(-1);
347 }
349 if ((tmpl_idx = malloc(sizeof(unsigned long)
350 *(rrd.stat_head->ds_cnt+1)))==NULL){
351 rrd_set_error("allocating tmpl_idx ...");
352 free(pdp_temp);
353 free(updvals);
354 rrd_free(&rrd);
355 fclose(rrd_file);
356 return(-1);
357 }
358 /* initialize template redirector */
359 /* default config example (assume DS 1 is a CDEF DS)
360 tmpl_idx[0] -> 0; (time)
361 tmpl_idx[1] -> 1; (DS 0)
362 tmpl_idx[2] -> 3; (DS 2)
363 tmpl_idx[3] -> 4; (DS 3) */
364 tmpl_idx[0] = 0; /* time */
365 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
366 {
367 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
368 tmpl_idx[ii++]=i;
369 }
370 tmpl_cnt= ii;
372 if (template) {
373 char *dsname;
374 unsigned int tmpl_len;
375 dsname = template;
376 tmpl_cnt = 1; /* the first entry is the time */
377 tmpl_len = strlen(template);
378 for(i=0;i<=tmpl_len ;i++) {
379 if (template[i] == ':' || template[i] == '\0') {
380 template[i] = '\0';
381 if (tmpl_cnt>rrd.stat_head->ds_cnt){
382 rrd_set_error("Template contains more DS definitions than RRD");
383 free(updvals); free(pdp_temp);
384 free(tmpl_idx); rrd_free(&rrd);
385 fclose(rrd_file); return(-1);
386 }
387 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
388 rrd_set_error("unknown DS name '%s'",dsname);
389 free(updvals); free(pdp_temp);
390 free(tmpl_idx); rrd_free(&rrd);
391 fclose(rrd_file); return(-1);
392 } else {
393 /* the first element is always the time */
394 tmpl_idx[tmpl_cnt-1]++;
395 /* go to the next entry on the template */
396 dsname = &template[i+1];
397 /* fix the damage we did before */
398 if (i<tmpl_len) {
399 template[i]=':';
400 }
402 }
403 }
404 }
405 }
406 if ((pdp_new = malloc(sizeof(rrd_value_t)
407 *rrd.stat_head->ds_cnt))==NULL){
408 rrd_set_error("allocating pdp_new ...");
409 free(updvals);
410 free(pdp_temp);
411 free(tmpl_idx);
412 rrd_free(&rrd);
413 fclose(rrd_file);
414 return(-1);
415 }
417 #ifdef HAVE_MMAP
418 rrd_mmaped_file = mmap(0,
419 rrd_filesize,
420 PROT_READ | PROT_WRITE,
421 MAP_SHARED,
422 fileno(rrd_file),
423 0);
424 if (rrd_mmaped_file == MAP_FAILED) {
425 rrd_set_error("error mmapping file %s", filename);
426 free(updvals);
427 free(pdp_temp);
428 free(tmpl_idx);
429 rrd_free(&rrd);
430 fclose(rrd_file);
431 return(-1);
432 }
433 #endif
434 /* loop through the arguments. */
435 for(arg_i=0; arg_i<argc;arg_i++) {
436 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
437 char *step_start = stepper;
438 char *p;
439 char *parsetime_error = NULL;
440 enum {atstyle, normal} timesyntax;
441 struct rrd_time_value ds_tv;
442 if (stepper == NULL){
443 rrd_set_error("failed duplication argv entry");
444 free(updvals);
445 free(pdp_temp);
446 free(tmpl_idx);
447 rrd_free(&rrd);
448 #ifdef HAVE_MMAP
449 munmap(rrd_mmaped_file, rrd_filesize);
450 #endif
451 fclose(rrd_file);
452 return(-1);
453 }
454 /* initialize all ds input to unknown except the first one
455 which has always got to be set */
456 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
457 strcpy(stepper,argv[arg_i]);
458 updvals[0]=stepper;
459 /* separate all ds elements; first must be examined separately
460 due to alternate time syntax */
461 if ((p=strchr(stepper,'@'))!=NULL) {
462 timesyntax = atstyle;
463 *p = '\0';
464 stepper = p+1;
465 } else if ((p=strchr(stepper,':'))!=NULL) {
466 timesyntax = normal;
467 *p = '\0';
468 stepper = p+1;
469 } else {
470 rrd_set_error("expected timestamp not found in data source from %s:...",
471 argv[arg_i]);
472 free(step_start);
473 break;
474 }
475 ii=1;
476 updvals[tmpl_idx[ii]] = stepper;
477 while (*stepper) {
478 if (*stepper == ':') {
479 *stepper = '\0';
480 ii++;
481 if (ii<tmpl_cnt){
482 updvals[tmpl_idx[ii]] = stepper+1;
483 }
484 }
485 stepper++;
486 }
488 if (ii != tmpl_cnt-1) {
489 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
490 tmpl_cnt-1, ii, argv[arg_i]);
491 free(step_start);
492 break;
493 }
495 /* get the time from the reading ... handle N */
496 if (timesyntax == atstyle) {
497 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
498 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
499 free(step_start);
500 break;
501 }
502 if (ds_tv.type == RELATIVE_TO_END_TIME ||
503 ds_tv.type == RELATIVE_TO_START_TIME) {
504 rrd_set_error("specifying time relative to the 'start' "
505 "or 'end' makes no sense here: %s",
506 updvals[0]);
507 free(step_start);
508 break;
509 }
511 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
512 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
514 } else if (strcmp(updvals[0],"N")==0){
515 gettimeofday(&tmp_time, 0);
516 normalize_time(&tmp_time);
517 current_time = tmp_time.tv_sec;
518 current_time_usec = tmp_time.tv_usec;
519 } else {
520 double tmp;
521 tmp = strtod(updvals[0], 0);
522 current_time = floor(tmp);
523 current_time_usec = (long)((tmp - current_time) * 1000000L);
524 }
525 /* dont do any correction for old version RRDs */
526 if(version < 3)
527 current_time_usec = 0;
529 if(current_time <= rrd.live_head->last_up){
530 rrd_set_error("illegal attempt to update using time %ld when "
531 "last update time is %ld (minimum one second step)",
532 current_time, rrd.live_head->last_up);
533 free(step_start);
534 break;
535 }
538 /* seek to the beginning of the rra's */
539 if (rra_current != rra_begin) {
540 #ifndef HAVE_MMAP
541 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
542 rrd_set_error("seek error in rrd");
543 free(step_start);
544 break;
545 }
546 #endif
547 rra_current = rra_begin;
548 }
549 rra_start = rra_begin;
551 /* when was the current pdp started */
552 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
553 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
555 /* when did the last pdp_st occur */
556 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
557 occu_pdp_st = current_time - occu_pdp_age;
558 /* interval = current_time - rrd.live_head->last_up; */
559 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
561 if (occu_pdp_st > proc_pdp_st){
562 /* OK we passed the pdp_st moment*/
563 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
564 * occurred before the latest
565 * pdp_st moment*/
566 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
567 post_int = occu_pdp_age; /* how much after it */
568 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
569 } else {
570 pre_int = interval;
571 post_int = 0;
572 }
574 #ifdef DEBUG
575 printf(
576 "proc_pdp_age %lu\t"
577 "proc_pdp_st %lu\t"
578 "occu_pfp_age %lu\t"
579 "occu_pdp_st %lu\t"
580 "int %lf\t"
581 "pre_int %lf\t"
582 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
583 occu_pdp_age, occu_pdp_st,
584 interval, pre_int, post_int);
585 #endif
587 /* process the data sources and update the pdp_prep
588 * area accordingly */
589 for(i=0;i<rrd.stat_head->ds_cnt;i++){
590 enum dst_en dst_idx;
591 dst_idx= dst_conv(rrd.ds_def[i].dst);
592 /* NOTE: DST_CDEF should never enter this if block, because
593 * updvals[i+1][0] is initialized to 'U'; unless the caller
594 * accidently specified a value for the DST_CDEF. To handle
595 * this case, an extra check is required. */
596 if((updvals[i+1][0] != 'U') &&
597 (dst_idx != DST_CDEF) &&
598 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
599 double rate = DNAN;
600 /* the data source type defines how to process the data */
601 /* pdp_new contains rate * time ... eg the bytes
602 * transferred during the interval. Doing it this way saves
603 * a lot of math operations */
606 switch(dst_idx){
607 case DST_COUNTER:
608 case DST_DERIVE:
609 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
610 for(ii=0;updvals[i+1][ii] != '\0';ii++){
611 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
612 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
613 break;
614 }
615 }
616 if (rrd_test_error()){
617 break;
618 }
619 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
620 if(dst_idx == DST_COUNTER) {
621 /* simple overflow catcher suggested by Andres Kroonmaa */
622 /* this will fail terribly for non 32 or 64 bit counters ... */
623 /* are there any others in SNMP land ? */
624 if (pdp_new[i] < (double)0.0 )
625 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
626 if (pdp_new[i] < (double)0.0 )
627 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
628 }
629 rate = pdp_new[i] / interval;
630 }
631 else {
632 pdp_new[i]= DNAN;
633 }
634 break;
635 case DST_ABSOLUTE:
636 errno = 0;
637 pdp_new[i] = strtod(updvals[i+1],&endptr);
638 if (errno > 0){
639 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
640 break;
641 };
642 if (endptr[0] != '\0'){
643 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
644 break;
645 }
646 rate = pdp_new[i] / interval;
647 break;
648 case DST_GAUGE:
649 errno = 0;
650 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
651 if (errno > 0){
652 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
653 break;
654 };
655 if (endptr[0] != '\0'){
656 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
657 break;
658 }
659 rate = pdp_new[i] / interval;
660 break;
661 default:
662 rrd_set_error("rrd contains unknown DS type : '%s'",
663 rrd.ds_def[i].dst);
664 break;
665 }
666 /* break out of this for loop if the error string is set */
667 if (rrd_test_error()){
668 break;
669 }
670 /* make sure pdp_temp is neither too large or too small
671 * if any of these occur it becomes unknown ...
672 * sorry folks ... */
673 if ( ! isnan(rate) &&
674 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
675 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
676 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
677 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
678 pdp_new[i] = DNAN;
679 }
680 } else {
681 /* no news is news all the same */
682 pdp_new[i] = DNAN;
683 }
685 /* make a copy of the command line argument for the next run */
686 #ifdef DEBUG
687 fprintf(stderr,
688 "prep ds[%lu]\t"
689 "last_arg '%s'\t"
690 "this_arg '%s'\t"
691 "pdp_new %10.2f\n",
692 i,
693 rrd.pdp_prep[i].last_ds,
694 updvals[i+1], pdp_new[i]);
695 #endif
696 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
697 strncpy(rrd.pdp_prep[i].last_ds,
698 updvals[i+1],LAST_DS_LEN-1);
699 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
700 }
701 }
702 /* break out of the argument parsing loop if the error_string is set */
703 if (rrd_test_error()){
704 free(step_start);
705 break;
706 }
707 /* has a pdp_st moment occurred since the last run ? */
709 if (proc_pdp_st == occu_pdp_st){
710 /* no we have not passed a pdp_st moment. therefore update is simple */
712 for(i=0;i<rrd.stat_head->ds_cnt;i++){
713 if(isnan(pdp_new[i]))
714 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
715 else
716 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
717 #ifdef DEBUG
718 fprintf(stderr,
719 "NO PDP ds[%lu]\t"
720 "value %10.2f\t"
721 "unkn_sec %5lu\n",
722 i,
723 rrd.pdp_prep[i].scratch[PDP_val].u_val,
724 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
725 #endif
726 }
727 } else {
728 /* an pdp_st has occurred. */
730 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
731 * occurred up to the last run.
732 pdp_new[] contains rate*seconds from the latest run.
733 pdp_temp[] will contain the rate for cdp */
735 for(i=0;i<rrd.stat_head->ds_cnt;i++){
736 /* update pdp_prep to the current pdp_st */
737 if(isnan(pdp_new[i]))
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
739 else
740 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
741 pdp_new[i]/(double)interval*(double)pre_int;
743 /* if too much of the pdp_prep is unknown we dump it */
744 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
745 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
746 (occu_pdp_st-proc_pdp_st <=
747 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
748 pdp_temp[i] = DNAN;
749 } else {
750 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
751 / (double)( occu_pdp_st
752 - proc_pdp_st
753 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
754 }
756 /* process CDEF data sources; remember each CDEF DS can
757 * only reference other DS with a lower index number */
758 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
759 rpnp_t *rpnp;
760 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
761 /* substitue data values for OP_VARIABLE nodes */
762 for (ii = 0; rpnp[ii].op != OP_END; ii++)
763 {
764 if (rpnp[ii].op == OP_VARIABLE) {
765 rpnp[ii].op = OP_NUMBER;
766 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
767 }
768 }
769 /* run the rpn calculator */
770 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
771 free(rpnp);
772 break; /* exits the data sources pdp_temp loop */
773 }
774 }
776 /* make pdp_prep ready for the next run */
777 if(isnan(pdp_new[i])){
778 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
779 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
780 } else {
781 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
782 rrd.pdp_prep[i].scratch[PDP_val].u_val =
783 pdp_new[i]/(double)interval*(double)post_int;
784 }
786 #ifdef DEBUG
787 fprintf(stderr,
788 "PDP UPD ds[%lu]\t"
789 "pdp_temp %10.2f\t"
790 "new_prep %10.2f\t"
791 "new_unkn_sec %5lu\n",
792 i, pdp_temp[i],
793 rrd.pdp_prep[i].scratch[PDP_val].u_val,
794 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
795 #endif
796 }
798 /* if there were errors during the last loop, bail out here */
799 if (rrd_test_error()){
800 free(step_start);
801 break;
802 }
804 /* compute the number of elapsed pdp_st moments */
805 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
806 #ifdef DEBUG
807 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
808 #endif
809 if (rra_step_cnt == NULL)
810 {
811 rra_step_cnt = (unsigned long *)
812 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
813 }
815 for(i = 0, rra_start = rra_begin;
816 i < rrd.stat_head->rra_cnt;
817 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
818 i++)
819 {
820 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
821 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
822 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
823 if (start_pdp_offset <= elapsed_pdp_st) {
824 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
825 rrd.rra_def[i].pdp_cnt + 1;
826 } else {
827 rra_step_cnt[i] = 0;
828 }
830 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
831 {
832 /* If this is a bulk update, we need to skip ahead in the seasonal
833 * arrays so that they will be correct for the next observed value;
834 * note that for the bulk update itself, no update will occur to
835 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
836 * be set to DNAN. */
837 if (rra_step_cnt[i] > 2)
838 {
839 /* skip update by resetting rra_step_cnt[i],
840 * note that this is not data source specific; this is due
841 * to the bulk update, not a DNAN value for the specific data
842 * source. */
843 rra_step_cnt[i] = 0;
844 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
845 &last_seasonal_coef);
846 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
847 &seasonal_coef);
848 }
850 /* periodically run a smoother for seasonal effects */
851 /* Need to use first cdp parameter buffer to track
852 * burnin (burnin requires a specific smoothing schedule).
853 * The CDP_init_seasonal parameter is really an RRA level,
854 * not a data source within RRA level parameter, but the rra_def
855 * is read only for rrd_update (not flushed to disk). */
856 iii = i*(rrd.stat_head -> ds_cnt);
857 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
858 <= BURNIN_CYCLES)
859 {
860 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
861 > rrd.rra_def[i].row_cnt - 1) {
862 /* mark off one of the burnin cycles */
863 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
864 schedule_smooth = 1;
865 }
866 } else {
867 /* someone has no doubt invented a trick to deal with this
868 * wrap around, but at least this code is clear. */
869 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
870 rrd.rra_ptr[i].cur_row)
871 {
872 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
873 * mapping between PDP and CDP */
874 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
875 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
876 {
877 #ifdef DEBUG
878 fprintf(stderr,
879 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
880 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
881 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
882 #endif
883 schedule_smooth = 1;
884 }
885 } else {
886 /* can't rely on negative numbers because we are working with
887 * unsigned values */
888 /* Don't need modulus here. If we've wrapped more than once, only
889 * one smooth is executed at the end. */
890 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
891 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
892 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
893 {
894 #ifdef DEBUG
895 fprintf(stderr,
896 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
897 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
898 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
899 #endif
900 schedule_smooth = 1;
901 }
902 }
903 }
905 rra_current = ftell(rrd_file);
906 } /* if cf is DEVSEASONAL or SEASONAL */
908 if (rrd_test_error()) break;
910 /* update CDP_PREP areas */
911 /* loop over data soures within each RRA */
912 for(ii = 0;
913 ii < rrd.stat_head->ds_cnt;
914 ii++)
915 {
917 /* iii indexes the CDP prep area for this data source within the RRA */
918 iii=i*rrd.stat_head->ds_cnt+ii;
920 if (rrd.rra_def[i].pdp_cnt > 1) {
922 if (rra_step_cnt[i] > 0) {
923 /* If we are in this block, as least 1 CDP value will be written to
924 * disk, this is the CDP_primary_val entry. If more than 1 value needs
925 * to be written, then the "fill in" value is the CDP_secondary_val
926 * entry. */
927 if (isnan(pdp_temp[ii]))
928 {
929 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
930 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
931 } else {
932 /* CDP_secondary value is the RRA "fill in" value for intermediary
933 * CDP data entries. No matter the CF, the value is the same because
934 * the average, max, min, and last of a list of identical values is
935 * the same, namely, the value itself. */
936 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
937 }
939 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
940 > rrd.rra_def[i].pdp_cnt*
941 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
942 {
943 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
944 /* initialize carry over */
945 if (current_cf == CF_AVERAGE) {
946 if (isnan(pdp_temp[ii])) {
947 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
948 } else {
949 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
950 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
951 }
952 } else {
953 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
954 }
955 } else {
956 rrd_value_t cum_val, cur_val;
957 switch (current_cf) {
958 case CF_AVERAGE:
959 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
960 cur_val = IFDNAN(pdp_temp[ii],0.0);
961 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
962 (cum_val + cur_val * start_pdp_offset) /
963 (rrd.rra_def[i].pdp_cnt
964 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
965 /* initialize carry over value */
966 if (isnan(pdp_temp[ii])) {
967 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
968 } else {
969 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
970 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
971 }
972 break;
973 case CF_MAXIMUM:
974 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
975 cur_val = IFDNAN(pdp_temp[ii],-DINF);
976 #ifdef DEBUG
977 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
978 isnan(pdp_temp[ii])) {
979 fprintf(stderr,
980 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
981 i,ii);
982 exit(-1);
983 }
984 #endif
985 if (cur_val > cum_val)
986 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
987 else
988 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
989 /* initialize carry over value */
990 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
991 break;
992 case CF_MINIMUM:
993 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
994 cur_val = IFDNAN(pdp_temp[ii],DINF);
995 #ifdef DEBUG
996 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
997 isnan(pdp_temp[ii])) {
998 fprintf(stderr,
999 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1000 i,ii);
1001 exit(-1);
1002 }
1003 #endif
1004 if (cur_val < cum_val)
1005 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1006 else
1007 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1008 /* initialize carry over value */
1009 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1010 break;
1011 case CF_LAST:
1012 default:
1013 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1014 /* initialize carry over value */
1015 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1016 break;
1017 }
1018 } /* endif meets xff value requirement for a valid value */
1019 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1020 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1021 if (isnan(pdp_temp[ii]))
1022 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1023 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1024 else
1025 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1026 } else /* rra_step_cnt[i] == 0 */
1027 {
1028 #ifdef DEBUG
1029 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1030 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1031 i,ii);
1032 } else {
1033 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1034 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1035 }
1036 #endif
1037 if (isnan(pdp_temp[ii])) {
1038 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1039 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1040 {
1041 if (current_cf == CF_AVERAGE) {
1042 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1043 elapsed_pdp_st;
1044 } else {
1045 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1046 }
1047 #ifdef DEBUG
1048 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1049 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1050 #endif
1051 } else {
1052 switch (current_cf) {
1053 case CF_AVERAGE:
1054 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1055 elapsed_pdp_st;
1056 break;
1057 case CF_MINIMUM:
1058 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1059 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1060 break;
1061 case CF_MAXIMUM:
1062 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1063 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1064 break;
1065 case CF_LAST:
1066 default:
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068 break;
1069 }
1070 }
1071 }
1072 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1073 if (elapsed_pdp_st > 2)
1074 {
1075 switch (current_cf) {
1076 case CF_AVERAGE:
1077 default:
1078 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1079 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1080 break;
1081 case CF_SEASONAL:
1082 case CF_DEVSEASONAL:
1083 /* need to update cached seasonal values, so they are consistent
1084 * with the bulk update */
1085 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1086 * CDP_last_deviation are the same. */
1087 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1088 last_seasonal_coef[ii];
1089 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1090 seasonal_coef[ii];
1091 break;
1092 case CF_HWPREDICT:
1093 /* need to update the null_count and last_null_count.
1094 * even do this for non-DNAN pdp_temp because the
1095 * algorithm is not learning from batch updates. */
1096 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1097 elapsed_pdp_st;
1098 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1099 elapsed_pdp_st - 1;
1100 /* fall through */
1101 case CF_DEVPREDICT:
1102 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1103 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1104 break;
1105 case CF_FAILURES:
1106 /* do not count missed bulk values as failures */
1107 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1108 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1109 /* need to reset violations buffer.
1110 * could do this more carefully, but for now, just
1111 * assume a bulk update wipes away all violations. */
1112 erase_violations(&rrd, iii, i);
1113 break;
1114 }
1115 }
1116 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1118 if (rrd_test_error()) break;
1120 } /* endif data sources loop */
1121 } /* end RRA Loop */
1123 /* this loop is only entered if elapsed_pdp_st < 3 */
1124 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1125 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1126 {
1127 for(i = 0, rra_start = rra_begin;
1128 i < rrd.stat_head->rra_cnt;
1129 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1130 i++)
1131 {
1132 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1134 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1135 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1136 {
1137 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1138 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1139 &seasonal_coef);
1140 rra_current = ftell(rrd_file);
1141 }
1142 if (rrd_test_error()) break;
1143 /* loop over data soures within each RRA */
1144 for(ii = 0;
1145 ii < rrd.stat_head->ds_cnt;
1146 ii++)
1147 {
1148 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1149 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1150 scratch_idx, seasonal_coef);
1151 }
1152 } /* end RRA Loop */
1153 if (rrd_test_error()) break;
1154 } /* end elapsed_pdp_st loop */
1156 if (rrd_test_error()) break;
1158 /* Ready to write to disk */
1159 /* Move sequentially through the file, writing one RRA at a time.
1160 * Note this architecture divorces the computation of CDP with
1161 * flushing updated RRA entries to disk. */
1162 for(i = 0, rra_start = rra_begin;
1163 i < rrd.stat_head->rra_cnt;
1164 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1165 i++) {
1166 /* is there anything to write for this RRA? If not, continue. */
1167 if (rra_step_cnt[i] == 0) continue;
1169 /* write the first row */
1170 #ifdef DEBUG
1171 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1172 #endif
1173 rrd.rra_ptr[i].cur_row++;
1174 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1175 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1176 /* positition on the first row */
1177 rra_pos_tmp = rra_start +
1178 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1179 if(rra_pos_tmp != rra_current) {
1180 #ifndef HAVE_MMAP
1181 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1182 rrd_set_error("seek error in rrd");
1183 break;
1184 }
1185 #endif
1186 rra_current = rra_pos_tmp;
1187 }
1189 #ifdef DEBUG
1190 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1191 #endif
1192 scratch_idx = CDP_primary_val;
1193 if (pcdp_summary != NULL)
1194 {
1195 rra_time = (current_time - current_time
1196 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1197 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1198 }
1199 #ifdef HAVE_MMAP
1200 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1201 pcdp_summary, &rra_time, rrd_mmaped_file);
1202 #else
1203 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1204 pcdp_summary, &rra_time);
1205 #endif
1206 if (rrd_test_error()) break;
1208 /* write other rows of the bulk update, if any */
1209 scratch_idx = CDP_secondary_val;
1210 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1211 {
1212 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1213 {
1214 #ifdef DEBUG
1215 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1216 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1217 #endif
1218 /* wrap */
1219 rrd.rra_ptr[i].cur_row = 0;
1220 /* seek back to beginning of current rra */
1221 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1222 {
1223 rrd_set_error("seek error in rrd");
1224 break;
1225 }
1226 #ifdef DEBUG
1227 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1228 #endif
1229 rra_current = rra_start;
1230 }
1231 if (pcdp_summary != NULL)
1232 {
1233 rra_time = (current_time - current_time
1234 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1235 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1236 }
1237 #ifdef HAVE_MMAP
1238 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1239 pcdp_summary, &rra_time, rrd_mmaped_file);
1240 #else
1241 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1242 pcdp_summary, &rra_time);
1243 #endif
1244 }
1246 if (rrd_test_error())
1247 break;
1248 } /* RRA LOOP */
1250 /* break out of the argument parsing loop if error_string is set */
1251 if (rrd_test_error()){
1252 free(step_start);
1253 break;
1254 }
1256 } /* endif a pdp_st has occurred */
1257 rrd.live_head->last_up = current_time;
1258 rrd.live_head->last_up_usec = current_time_usec;
1259 free(step_start);
1260 } /* function argument loop */
1262 if (seasonal_coef != NULL) free(seasonal_coef);
1263 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1264 if (rra_step_cnt != NULL) free(rra_step_cnt);
1265 rpnstack_free(&rpnstack);
1267 #ifdef HAVE_MMAP
1268 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1269 rrd_set_error("error writing(unmapping) file: %s", filename);
1270 }
1271 #endif
1272 /* if we got here and if there is an error and if the file has not been
1273 * written to, then close things up and return. */
1274 if (rrd_test_error()) {
1275 free(updvals);
1276 free(tmpl_idx);
1277 rrd_free(&rrd);
1278 free(pdp_temp);
1279 free(pdp_new);
1280 fclose(rrd_file);
1281 return(-1);
1282 }
1284 /* aargh ... that was tough ... so many loops ... anyway, its done.
1285 * we just need to write back the live header portion now*/
1287 if (fseek(rrd_file, (sizeof(stat_head_t)
1288 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1289 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1290 SEEK_SET) != 0) {
1291 rrd_set_error("seek rrd for live header writeback");
1292 free(updvals);
1293 free(tmpl_idx);
1294 rrd_free(&rrd);
1295 free(pdp_temp);
1296 free(pdp_new);
1297 fclose(rrd_file);
1298 return(-1);
1299 }
1301 if(version >= 3) {
1302 if(fwrite( rrd.live_head,
1303 sizeof(live_head_t), 1, rrd_file) != 1){
1304 rrd_set_error("fwrite live_head to rrd");
1305 free(updvals);
1306 rrd_free(&rrd);
1307 free(tmpl_idx);
1308 free(pdp_temp);
1309 free(pdp_new);
1310 fclose(rrd_file);
1311 return(-1);
1312 }
1313 }
1314 else {
1315 if(fwrite( &rrd.live_head->last_up,
1316 sizeof(time_t), 1, rrd_file) != 1){
1317 rrd_set_error("fwrite live_head to rrd");
1318 free(updvals);
1319 rrd_free(&rrd);
1320 free(tmpl_idx);
1321 free(pdp_temp);
1322 free(pdp_new);
1323 fclose(rrd_file);
1324 return(-1);
1325 }
1326 }
1329 if(fwrite( rrd.pdp_prep,
1330 sizeof(pdp_prep_t),
1331 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1332 rrd_set_error("ftwrite pdp_prep to rrd");
1333 free(updvals);
1334 rrd_free(&rrd);
1335 free(tmpl_idx);
1336 free(pdp_temp);
1337 free(pdp_new);
1338 fclose(rrd_file);
1339 return(-1);
1340 }
1342 if(fwrite( rrd.cdp_prep,
1343 sizeof(cdp_prep_t),
1344 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1345 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1347 rrd_set_error("ftwrite cdp_prep to rrd");
1348 free(updvals);
1349 free(tmpl_idx);
1350 rrd_free(&rrd);
1351 free(pdp_temp);
1352 free(pdp_new);
1353 fclose(rrd_file);
1354 return(-1);
1355 }
1357 if(fwrite( rrd.rra_ptr,
1358 sizeof(rra_ptr_t),
1359 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1360 rrd_set_error("fwrite rra_ptr to rrd");
1361 free(updvals);
1362 free(tmpl_idx);
1363 rrd_free(&rrd);
1364 free(pdp_temp);
1365 free(pdp_new);
1366 fclose(rrd_file);
1367 return(-1);
1368 }
1370 /* OK now close the files and free the memory */
1371 if(fclose(rrd_file) != 0){
1372 rrd_set_error("closing rrd");
1373 free(updvals);
1374 free(tmpl_idx);
1375 rrd_free(&rrd);
1376 free(pdp_temp);
1377 free(pdp_new);
1378 return(-1);
1379 }
1381 /* calling the smoothing code here guarantees at most
1382 * one smoothing operation per rrd_update call. Unfortunately,
1383 * it is possible with bulk updates, or a long-delayed update
1384 * for smoothing to occur off-schedule. This really isn't
1385 * critical except during the burning cycles. */
1386 if (schedule_smooth)
1387 {
1388 rrd_file = fopen(filename,"rb+");
1389 rra_start = rra_begin;
1390 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1391 {
1392 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1393 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1394 {
1395 #ifdef DEBUG
1396 fprintf(stderr,"Running smoother for rra %ld\n",i);
1397 #endif
1398 apply_smoother(&rrd,i,rra_start,rrd_file);
1399 if (rrd_test_error())
1400 break;
1401 }
1402 rra_start += rrd.rra_def[i].row_cnt
1403 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1404 }
1405 fclose(rrd_file);
1406 }
1407 rrd_free(&rrd);
1408 free(updvals);
1409 free(tmpl_idx);
1410 free(pdp_new);
1411 free(pdp_temp);
1412 return(0);
1413 }
1415 /*
1416 * get exclusive lock to whole file.
1417 * lock gets removed when we close the file
1418 *
1419 * returns 0 on success
1420 */
1421 int
1422 LockRRD(FILE *rrdfile)
1423 {
1424 int rrd_fd; /* File descriptor for RRD */
1425 int rcstat;
1427 rrd_fd = fileno(rrdfile);
1429 {
1430 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1431 struct _stat st;
1433 if ( _fstat( rrd_fd, &st ) == 0 ) {
1434 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1435 } else {
1436 rcstat = -1;
1437 }
1438 #else
1439 struct flock lock;
1440 lock.l_type = F_WRLCK; /* exclusive write lock */
1441 lock.l_len = 0; /* whole file */
1442 lock.l_start = 0; /* start of file */
1443 lock.l_whence = SEEK_SET; /* end of file */
1445 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1446 #endif
1447 }
1449 return(rcstat);
1450 }
1453 #ifdef HAVE_MMAP
1454 info_t
1455 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1456 unsigned short CDP_scratch_idx, FILE *rrd_file,
1457 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1458 #else
1459 info_t
1460 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1461 unsigned short CDP_scratch_idx, FILE *rrd_file,
1462 info_t *pcdp_summary, time_t *rra_time)
1463 #endif
1464 {
1465 unsigned long ds_idx, cdp_idx;
1466 infoval iv;
1468 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1469 {
1470 /* compute the cdp index */
1471 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1472 #ifdef DEBUG
1473 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1474 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1475 rrd -> rra_def[rra_idx].cf_nam);
1476 #endif
1477 if (pcdp_summary != NULL)
1478 {
1479 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1480 /* append info to the return hash */
1481 pcdp_summary = info_push(pcdp_summary,
1482 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1483 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1484 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1485 RD_I_VAL, iv);
1486 }
1487 #ifdef HAVE_MMAP
1488 memcpy((char *)rrd_mmaped_file + *rra_current,
1489 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1490 sizeof(rrd_value_t));
1491 #else
1492 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1493 sizeof(rrd_value_t),1,rrd_file) != 1)
1494 {
1495 rrd_set_error("writing rrd");
1496 return 0;
1497 }
1498 #endif
1499 *rra_current += sizeof(rrd_value_t);
1500 }
1501 return (pcdp_summary);
1502 }