1 /*****************************************************************************
2 * RRDtool 1.2.10 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 struct timeval {
36 time_t tv_sec; /* seconds */
37 long tv_usec; /* microseconds */
38 };
40 struct __timezone {
41 int tz_minuteswest; /* minutes W of Greenwich */
42 int tz_dsttime; /* type of dst correction */
43 };
45 static gettimeofday(struct timeval *t, struct __timezone *tz) {
47 struct timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
53 }
55 #endif
56 /*
57 * normilize time as returned by gettimeofday. usec part must
58 * be always >= 0
59 */
60 static void normalize_time(struct timeval *t)
61 {
62 if(t->tv_usec < 0) {
63 t->tv_sec--;
64 t->tv_usec += 1000000L;
65 }
66 }
68 /* Local prototypes */
69 int LockRRD(FILE *rrd_file);
70 #ifdef HAVE_MMAP
71 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
72 unsigned long *rra_current,
73 unsigned short CDP_scratch_idx,
74 #ifndef DEBUG
75 FILE UNUSED(*rrd_file),
76 #else
77 FILE *rrd_file,
78 #endif
79 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
80 #else
81 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
82 unsigned long *rra_current,
83 unsigned short CDP_scratch_idx, FILE *rrd_file,
84 info_t *pcdp_summary, time_t *rra_time);
85 #endif
86 int rrd_update_r(char *filename, char *template, int argc, char **argv);
87 int _rrd_update(char *filename, char *template, int argc, char **argv,
88 info_t*);
90 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
93 #ifdef STANDALONE
94 int
95 main(int argc, char **argv){
96 rrd_update(argc,argv);
97 if (rrd_test_error()) {
98 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
99 "Usage: rrdupdate filename\n"
100 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
101 "\t\t\ttime|N:value[:value...]\n\n"
102 "\t\t\tat-time@value[:value...]\n\n"
103 "\t\t\t[ time:value[:value...] ..]\n\n");
105 printf("ERROR: %s\n",rrd_get_error());
106 rrd_clear_error();
107 return 1;
108 }
109 return 0;
110 }
111 #endif
113 info_t *rrd_update_v(int argc, char **argv)
114 {
115 char *template = NULL;
116 info_t *result = NULL;
117 infoval rc;
118 optind = 0; opterr = 0; /* initialize getopt */
120 while (1) {
121 static struct option long_options[] =
122 {
123 {"template", required_argument, 0, 't'},
124 {0,0,0,0}
125 };
126 int option_index = 0;
127 int opt;
128 opt = getopt_long(argc, argv, "t:",
129 long_options, &option_index);
131 if (opt == EOF)
132 break;
134 switch(opt) {
135 case 't':
136 template = optarg;
137 break;
139 case '?':
140 rrd_set_error("unknown option '%s'",argv[optind-1]);
141 rc.u_int = -1;
142 goto end_tag;
143 }
144 }
146 /* need at least 2 arguments: filename, data. */
147 if (argc-optind < 2) {
148 rrd_set_error("Not enough arguments");
149 rc.u_int = -1;
150 goto end_tag;
151 }
152 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
153 rc.u_int = _rrd_update(argv[optind], template,
154 argc - optind - 1, argv + optind + 1, result);
155 result->value.u_int = rc.u_int;
156 end_tag:
157 return result;
158 }
160 int
161 rrd_update(int argc, char **argv)
162 {
163 char *template = NULL;
164 int rc;
165 optind = 0; opterr = 0; /* initialize getopt */
167 while (1) {
168 static struct option long_options[] =
169 {
170 {"template", required_argument, 0, 't'},
171 {0,0,0,0}
172 };
173 int option_index = 0;
174 int opt;
175 opt = getopt_long(argc, argv, "t:",
176 long_options, &option_index);
178 if (opt == EOF)
179 break;
181 switch(opt) {
182 case 't':
183 template = optarg;
184 break;
186 case '?':
187 rrd_set_error("unknown option '%s'",argv[optind-1]);
188 return(-1);
189 }
190 }
192 /* need at least 2 arguments: filename, data. */
193 if (argc-optind < 2) {
194 rrd_set_error("Not enough arguments");
196 return -1;
197 }
199 rc = rrd_update_r(argv[optind], template,
200 argc - optind - 1, argv + optind + 1);
201 return rc;
202 }
204 int
205 rrd_update_r(char *filename, char *template, int argc, char **argv)
206 {
207 return _rrd_update(filename, template, argc, argv, NULL);
208 }
210 int
211 _rrd_update(char *filename, char *template, int argc, char **argv,
212 info_t *pcdp_summary)
213 {
215 int arg_i = 2;
216 short j;
217 unsigned long i,ii,iii=1;
219 unsigned long rra_begin; /* byte pointer to the rra
220 * area in the rrd file. this
221 * pointer never changes value */
222 unsigned long rra_start; /* byte pointer to the rra
223 * area in the rrd file. this
224 * pointer changes as each rrd is
225 * processed. */
226 unsigned long rra_current; /* byte pointer to the current write
227 * spot in the rrd file. */
228 unsigned long rra_pos_tmp; /* temporary byte pointer. */
229 double interval,
230 pre_int,post_int; /* interval between this and
231 * the last run */
232 unsigned long proc_pdp_st; /* which pdp_st was the last
233 * to be processed */
234 unsigned long occu_pdp_st; /* when was the pdp_st
235 * before the last update
236 * time */
237 unsigned long proc_pdp_age; /* how old was the data in
238 * the pdp prep area when it
239 * was last updated */
240 unsigned long occu_pdp_age; /* how long ago was the last
241 * pdp_step time */
242 rrd_value_t *pdp_new; /* prepare the incoming data
243 * to be added the the
244 * existing entry */
245 rrd_value_t *pdp_temp; /* prepare the pdp values
246 * to be added the the
247 * cdp values */
249 long *tmpl_idx; /* index representing the settings
250 transported by the template index */
251 unsigned long tmpl_cnt = 2; /* time and data */
253 FILE *rrd_file;
254 rrd_t rrd;
255 time_t current_time = 0;
256 time_t rra_time = 0; /* time of update for a RRA */
257 unsigned long current_time_usec=0;/* microseconds part of current time */
258 struct timeval tmp_time; /* used for time conversion */
260 char **updvals;
261 int schedule_smooth = 0;
262 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
263 /* a vector of future Holt-Winters seasonal coefs */
264 unsigned long elapsed_pdp_st;
265 /* number of elapsed PDP steps since last update */
266 unsigned long *rra_step_cnt = NULL;
267 /* number of rows to be updated in an RRA for a data
268 * value. */
269 unsigned long start_pdp_offset;
270 /* number of PDP steps since the last update that
271 * are assigned to the first CDP to be generated
272 * since the last update. */
273 unsigned short scratch_idx;
274 /* index into the CDP scratch array */
275 enum cf_en current_cf;
276 /* numeric id of the current consolidation function */
277 rpnstack_t rpnstack; /* used for COMPUTE DS */
278 int version; /* rrd version */
279 char *endptr; /* used in the conversion */
280 #ifdef HAVE_MMAP
281 void *rrd_mmaped_file;
282 unsigned long rrd_filesize;
283 #endif
285 rpnstack_init(&rpnstack);
287 /* need at least 1 arguments: data. */
288 if (argc < 1) {
289 rrd_set_error("Not enough arguments");
290 return -1;
291 }
295 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
296 return -1;
297 }
298 /* initialize time */
299 version = atoi(rrd.stat_head->version);
300 gettimeofday(&tmp_time, 0);
301 normalize_time(&tmp_time);
302 current_time = tmp_time.tv_sec;
303 if(version >= 3) {
304 current_time_usec = tmp_time.tv_usec;
305 }
306 else {
307 current_time_usec = 0;
308 }
310 rra_current = rra_start = rra_begin = ftell(rrd_file);
311 /* This is defined in the ANSI C standard, section 7.9.5.3:
313 When a file is opened with udpate mode ('+' as the second
314 or third character in the ... list of mode argument
315 variables), both input and ouptut may be performed on the
316 associated stream. However, ... input may not be directly
317 followed by output without an intervening call to a file
318 positioning function, unless the input oepration encounters
319 end-of-file. */
320 #ifdef HAVE_MMAP
321 fseek(rrd_file, 0, SEEK_END);
322 rrd_filesize = ftell(rrd_file);
323 fseek(rrd_file, rra_current, SEEK_SET);
324 #else
325 fseek(rrd_file, 0, SEEK_CUR);
326 #endif
329 /* get exclusive lock to whole file.
330 * lock gets removed when we close the file.
331 */
332 if (LockRRD(rrd_file) != 0) {
333 rrd_set_error("could not lock RRD");
334 rrd_free(&rrd);
335 fclose(rrd_file);
336 return(-1);
337 }
339 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
340 rrd_set_error("allocating updvals pointer array");
341 rrd_free(&rrd);
342 fclose(rrd_file);
343 return(-1);
344 }
346 if ((pdp_temp = malloc(sizeof(rrd_value_t)
347 *rrd.stat_head->ds_cnt))==NULL){
348 rrd_set_error("allocating pdp_temp ...");
349 free(updvals);
350 rrd_free(&rrd);
351 fclose(rrd_file);
352 return(-1);
353 }
355 if ((tmpl_idx = malloc(sizeof(unsigned long)
356 *(rrd.stat_head->ds_cnt+1)))==NULL){
357 rrd_set_error("allocating tmpl_idx ...");
358 free(pdp_temp);
359 free(updvals);
360 rrd_free(&rrd);
361 fclose(rrd_file);
362 return(-1);
363 }
364 /* initialize template redirector */
365 /* default config example (assume DS 1 is a CDEF DS)
366 tmpl_idx[0] -> 0; (time)
367 tmpl_idx[1] -> 1; (DS 0)
368 tmpl_idx[2] -> 3; (DS 2)
369 tmpl_idx[3] -> 4; (DS 3) */
370 tmpl_idx[0] = 0; /* time */
371 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
372 {
373 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
374 tmpl_idx[ii++]=i;
375 }
376 tmpl_cnt= ii;
378 if (template) {
379 char *dsname;
380 unsigned int tmpl_len;
381 dsname = template;
382 tmpl_cnt = 1; /* the first entry is the time */
383 tmpl_len = strlen(template);
384 for(i=0;i<=tmpl_len ;i++) {
385 if (template[i] == ':' || template[i] == '\0') {
386 template[i] = '\0';
387 if (tmpl_cnt>rrd.stat_head->ds_cnt){
388 rrd_set_error("Template contains more DS definitions than RRD");
389 free(updvals); free(pdp_temp);
390 free(tmpl_idx); rrd_free(&rrd);
391 fclose(rrd_file); return(-1);
392 }
393 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
394 rrd_set_error("unknown DS name '%s'",dsname);
395 free(updvals); free(pdp_temp);
396 free(tmpl_idx); rrd_free(&rrd);
397 fclose(rrd_file); return(-1);
398 } else {
399 /* the first element is always the time */
400 tmpl_idx[tmpl_cnt-1]++;
401 /* go to the next entry on the template */
402 dsname = &template[i+1];
403 /* fix the damage we did before */
404 if (i<tmpl_len) {
405 template[i]=':';
406 }
408 }
409 }
410 }
411 }
412 if ((pdp_new = malloc(sizeof(rrd_value_t)
413 *rrd.stat_head->ds_cnt))==NULL){
414 rrd_set_error("allocating pdp_new ...");
415 free(updvals);
416 free(pdp_temp);
417 free(tmpl_idx);
418 rrd_free(&rrd);
419 fclose(rrd_file);
420 return(-1);
421 }
423 #ifdef HAVE_MMAP
424 rrd_mmaped_file = mmap(0,
425 rrd_filesize,
426 PROT_READ | PROT_WRITE,
427 MAP_SHARED,
428 fileno(rrd_file),
429 0);
430 if (rrd_mmaped_file == MAP_FAILED) {
431 rrd_set_error("error mmapping file %s", filename);
432 free(updvals);
433 free(pdp_temp);
434 free(tmpl_idx);
435 rrd_free(&rrd);
436 fclose(rrd_file);
437 return(-1);
438 }
439 #endif
440 /* loop through the arguments. */
441 for(arg_i=0; arg_i<argc;arg_i++) {
442 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
443 char *step_start = stepper;
444 char *p;
445 char *parsetime_error = NULL;
446 enum {atstyle, normal} timesyntax;
447 struct rrd_time_value ds_tv;
448 if (stepper == NULL){
449 rrd_set_error("failed duplication argv entry");
450 free(updvals);
451 free(pdp_temp);
452 free(tmpl_idx);
453 rrd_free(&rrd);
454 #ifdef HAVE_MMAP
455 munmap(rrd_mmaped_file, rrd_filesize);
456 #endif
457 fclose(rrd_file);
458 return(-1);
459 }
460 /* initialize all ds input to unknown except the first one
461 which has always got to be set */
462 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
463 strcpy(stepper,argv[arg_i]);
464 updvals[0]=stepper;
465 /* separate all ds elements; first must be examined separately
466 due to alternate time syntax */
467 if ((p=strchr(stepper,'@'))!=NULL) {
468 timesyntax = atstyle;
469 *p = '\0';
470 stepper = p+1;
471 } else if ((p=strchr(stepper,':'))!=NULL) {
472 timesyntax = normal;
473 *p = '\0';
474 stepper = p+1;
475 } else {
476 rrd_set_error("expected timestamp not found in data source from %s:...",
477 argv[arg_i]);
478 free(step_start);
479 break;
480 }
481 ii=1;
482 updvals[tmpl_idx[ii]] = stepper;
483 while (*stepper) {
484 if (*stepper == ':') {
485 *stepper = '\0';
486 ii++;
487 if (ii<tmpl_cnt){
488 updvals[tmpl_idx[ii]] = stepper+1;
489 }
490 }
491 stepper++;
492 }
494 if (ii != tmpl_cnt-1) {
495 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
496 tmpl_cnt-1, ii, argv[arg_i]);
497 free(step_start);
498 break;
499 }
501 /* get the time from the reading ... handle N */
502 if (timesyntax == atstyle) {
503 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
504 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
505 free(step_start);
506 break;
507 }
508 if (ds_tv.type == RELATIVE_TO_END_TIME ||
509 ds_tv.type == RELATIVE_TO_START_TIME) {
510 rrd_set_error("specifying time relative to the 'start' "
511 "or 'end' makes no sense here: %s",
512 updvals[0]);
513 free(step_start);
514 break;
515 }
517 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
518 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
520 } else if (strcmp(updvals[0],"N")==0){
521 gettimeofday(&tmp_time, 0);
522 normalize_time(&tmp_time);
523 current_time = tmp_time.tv_sec;
524 current_time_usec = tmp_time.tv_usec;
525 } else {
526 double tmp;
527 tmp = strtod(updvals[0], 0);
528 current_time = floor(tmp);
529 current_time_usec = (long)((tmp - current_time) * 1000000L);
530 }
531 /* dont do any correction for old version RRDs */
532 if(version < 3)
533 current_time_usec = 0;
535 if(current_time <= rrd.live_head->last_up){
536 rrd_set_error("illegal attempt to update using time %ld when "
537 "last update time is %ld (minimum one second step)",
538 current_time, rrd.live_head->last_up);
539 free(step_start);
540 break;
541 }
544 /* seek to the beginning of the rra's */
545 if (rra_current != rra_begin) {
546 #ifndef HAVE_MMAP
547 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548 rrd_set_error("seek error in rrd");
549 free(step_start);
550 break;
551 }
552 #endif
553 rra_current = rra_begin;
554 }
555 rra_start = rra_begin;
557 /* when was the current pdp started */
558 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
561 /* when did the last pdp_st occur */
562 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563 occu_pdp_st = current_time - occu_pdp_age;
564 /* interval = current_time - rrd.live_head->last_up; */
565 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
567 if (occu_pdp_st > proc_pdp_st){
568 /* OK we passed the pdp_st moment*/
569 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570 * occurred before the latest
571 * pdp_st moment*/
572 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573 post_int = occu_pdp_age; /* how much after it */
574 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
575 } else {
576 pre_int = interval;
577 post_int = 0;
578 }
580 #ifdef DEBUG
581 printf(
582 "proc_pdp_age %lu\t"
583 "proc_pdp_st %lu\t"
584 "occu_pfp_age %lu\t"
585 "occu_pdp_st %lu\t"
586 "int %lf\t"
587 "pre_int %lf\t"
588 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
589 occu_pdp_age, occu_pdp_st,
590 interval, pre_int, post_int);
591 #endif
593 /* process the data sources and update the pdp_prep
594 * area accordingly */
595 for(i=0;i<rrd.stat_head->ds_cnt;i++){
596 enum dst_en dst_idx;
597 dst_idx= dst_conv(rrd.ds_def[i].dst);
599 /* make sure we do not build diffs with old last_ds values */
600 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
601 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
602 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
603 }
605 /* NOTE: DST_CDEF should never enter this if block, because
606 * updvals[i+1][0] is initialized to 'U'; unless the caller
607 * accidently specified a value for the DST_CDEF. To handle
608 * this case, an extra check is required. */
610 if((updvals[i+1][0] != 'U') &&
611 (dst_idx != DST_CDEF) &&
612 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
613 double rate = DNAN;
614 /* the data source type defines how to process the data */
615 /* pdp_new contains rate * time ... eg the bytes
616 * transferred during the interval. Doing it this way saves
617 * a lot of math operations */
620 switch(dst_idx){
621 case DST_COUNTER:
622 case DST_DERIVE:
623 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
624 for(ii=0;updvals[i+1][ii] != '\0';ii++){
625 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
626 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
627 break;
628 }
629 }
630 if (rrd_test_error()){
631 break;
632 }
633 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
634 if(dst_idx == DST_COUNTER) {
635 /* simple overflow catcher suggested by Andres Kroonmaa */
636 /* this will fail terribly for non 32 or 64 bit counters ... */
637 /* are there any others in SNMP land ? */
638 if (pdp_new[i] < (double)0.0 )
639 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
640 if (pdp_new[i] < (double)0.0 )
641 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
642 }
643 rate = pdp_new[i] / interval;
644 }
645 else {
646 pdp_new[i]= DNAN;
647 }
648 break;
649 case DST_ABSOLUTE:
650 errno = 0;
651 pdp_new[i] = strtod(updvals[i+1],&endptr);
652 if (errno > 0){
653 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
654 break;
655 };
656 if (endptr[0] != '\0'){
657 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
658 break;
659 }
660 rate = pdp_new[i] / interval;
661 break;
662 case DST_GAUGE:
663 errno = 0;
664 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
665 if (errno > 0){
666 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
667 break;
668 };
669 if (endptr[0] != '\0'){
670 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
671 break;
672 }
673 rate = pdp_new[i] / interval;
674 break;
675 default:
676 rrd_set_error("rrd contains unknown DS type : '%s'",
677 rrd.ds_def[i].dst);
678 break;
679 }
680 /* break out of this for loop if the error string is set */
681 if (rrd_test_error()){
682 break;
683 }
684 /* make sure pdp_temp is neither too large or too small
685 * if any of these occur it becomes unknown ...
686 * sorry folks ... */
687 if ( ! isnan(rate) &&
688 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
689 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
690 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
691 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
692 pdp_new[i] = DNAN;
693 }
694 } else {
695 /* no news is news all the same */
696 pdp_new[i] = DNAN;
697 }
699 /* make a copy of the command line argument for the next run */
700 #ifdef DEBUG
701 fprintf(stderr,
702 "prep ds[%lu]\t"
703 "last_arg '%s'\t"
704 "this_arg '%s'\t"
705 "pdp_new %10.2f\n",
706 i,
707 rrd.pdp_prep[i].last_ds,
708 updvals[i+1], pdp_new[i]);
709 #endif
710 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
711 strncpy(rrd.pdp_prep[i].last_ds,
712 updvals[i+1],LAST_DS_LEN-1);
713 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
714 }
715 }
716 /* break out of the argument parsing loop if the error_string is set */
717 if (rrd_test_error()){
718 free(step_start);
719 break;
720 }
721 /* has a pdp_st moment occurred since the last run ? */
723 if (proc_pdp_st == occu_pdp_st){
724 /* no we have not passed a pdp_st moment. therefore update is simple */
726 for(i=0;i<rrd.stat_head->ds_cnt;i++){
727 if(isnan(pdp_new[i]))
728 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
729 else
730 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
731 #ifdef DEBUG
732 fprintf(stderr,
733 "NO PDP ds[%lu]\t"
734 "value %10.2f\t"
735 "unkn_sec %5lu\n",
736 i,
737 rrd.pdp_prep[i].scratch[PDP_val].u_val,
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 #endif
740 }
741 } else {
742 /* an pdp_st has occurred. */
744 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
745 * occurred up to the last run.
746 pdp_new[] contains rate*seconds from the latest run.
747 pdp_temp[] will contain the rate for cdp */
749 for(i=0;i<rrd.stat_head->ds_cnt;i++){
750 /* update pdp_prep to the current pdp_st */
751 if(isnan(pdp_new[i]))
752 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
753 else
754 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
755 pdp_new[i]/(double)interval*(double)pre_int;
757 /* if too much of the pdp_prep is unknown we dump it */
758 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
759 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
760 (occu_pdp_st-proc_pdp_st <=
761 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
762 pdp_temp[i] = DNAN;
763 } else {
764 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
765 / (double)( occu_pdp_st
766 - proc_pdp_st
767 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
768 }
770 /* process CDEF data sources; remember each CDEF DS can
771 * only reference other DS with a lower index number */
772 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
773 rpnp_t *rpnp;
774 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
775 /* substitue data values for OP_VARIABLE nodes */
776 for (ii = 0; rpnp[ii].op != OP_END; ii++)
777 {
778 if (rpnp[ii].op == OP_VARIABLE) {
779 rpnp[ii].op = OP_NUMBER;
780 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
781 }
782 }
783 /* run the rpn calculator */
784 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
785 free(rpnp);
786 break; /* exits the data sources pdp_temp loop */
787 }
788 }
790 /* make pdp_prep ready for the next run */
791 if(isnan(pdp_new[i])){
792 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
793 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
794 } else {
795 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
796 rrd.pdp_prep[i].scratch[PDP_val].u_val =
797 pdp_new[i]/(double)interval*(double)post_int;
798 }
800 #ifdef DEBUG
801 fprintf(stderr,
802 "PDP UPD ds[%lu]\t"
803 "pdp_temp %10.2f\t"
804 "new_prep %10.2f\t"
805 "new_unkn_sec %5lu\n",
806 i, pdp_temp[i],
807 rrd.pdp_prep[i].scratch[PDP_val].u_val,
808 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
809 #endif
810 }
812 /* if there were errors during the last loop, bail out here */
813 if (rrd_test_error()){
814 free(step_start);
815 break;
816 }
818 /* compute the number of elapsed pdp_st moments */
819 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
820 #ifdef DEBUG
821 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
822 #endif
823 if (rra_step_cnt == NULL)
824 {
825 rra_step_cnt = (unsigned long *)
826 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
827 }
829 for(i = 0, rra_start = rra_begin;
830 i < rrd.stat_head->rra_cnt;
831 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
832 i++)
833 {
834 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
835 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
836 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
837 if (start_pdp_offset <= elapsed_pdp_st) {
838 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
839 rrd.rra_def[i].pdp_cnt + 1;
840 } else {
841 rra_step_cnt[i] = 0;
842 }
844 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
845 {
846 /* If this is a bulk update, we need to skip ahead in the seasonal
847 * arrays so that they will be correct for the next observed value;
848 * note that for the bulk update itself, no update will occur to
849 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
850 * be set to DNAN. */
851 if (rra_step_cnt[i] > 2)
852 {
853 /* skip update by resetting rra_step_cnt[i],
854 * note that this is not data source specific; this is due
855 * to the bulk update, not a DNAN value for the specific data
856 * source. */
857 rra_step_cnt[i] = 0;
858 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
859 &last_seasonal_coef);
860 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
861 &seasonal_coef);
862 }
864 /* periodically run a smoother for seasonal effects */
865 /* Need to use first cdp parameter buffer to track
866 * burnin (burnin requires a specific smoothing schedule).
867 * The CDP_init_seasonal parameter is really an RRA level,
868 * not a data source within RRA level parameter, but the rra_def
869 * is read only for rrd_update (not flushed to disk). */
870 iii = i*(rrd.stat_head -> ds_cnt);
871 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
872 <= BURNIN_CYCLES)
873 {
874 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
875 > rrd.rra_def[i].row_cnt - 1) {
876 /* mark off one of the burnin cycles */
877 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
878 schedule_smooth = 1;
879 }
880 } else {
881 /* someone has no doubt invented a trick to deal with this
882 * wrap around, but at least this code is clear. */
883 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
884 rrd.rra_ptr[i].cur_row)
885 {
886 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
887 * mapping between PDP and CDP */
888 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
889 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
890 {
891 #ifdef DEBUG
892 fprintf(stderr,
893 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
894 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
895 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
896 #endif
897 schedule_smooth = 1;
898 }
899 } else {
900 /* can't rely on negative numbers because we are working with
901 * unsigned values */
902 /* Don't need modulus here. If we've wrapped more than once, only
903 * one smooth is executed at the end. */
904 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
905 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
906 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
907 {
908 #ifdef DEBUG
909 fprintf(stderr,
910 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
911 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
912 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
913 #endif
914 schedule_smooth = 1;
915 }
916 }
917 }
919 rra_current = ftell(rrd_file);
920 } /* if cf is DEVSEASONAL or SEASONAL */
922 if (rrd_test_error()) break;
924 /* update CDP_PREP areas */
925 /* loop over data soures within each RRA */
926 for(ii = 0;
927 ii < rrd.stat_head->ds_cnt;
928 ii++)
929 {
931 /* iii indexes the CDP prep area for this data source within the RRA */
932 iii=i*rrd.stat_head->ds_cnt+ii;
934 if (rrd.rra_def[i].pdp_cnt > 1) {
936 if (rra_step_cnt[i] > 0) {
937 /* If we are in this block, as least 1 CDP value will be written to
938 * disk, this is the CDP_primary_val entry. If more than 1 value needs
939 * to be written, then the "fill in" value is the CDP_secondary_val
940 * entry. */
941 if (isnan(pdp_temp[ii]))
942 {
943 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
944 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
945 } else {
946 /* CDP_secondary value is the RRA "fill in" value for intermediary
947 * CDP data entries. No matter the CF, the value is the same because
948 * the average, max, min, and last of a list of identical values is
949 * the same, namely, the value itself. */
950 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
951 }
953 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
954 > rrd.rra_def[i].pdp_cnt*
955 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
956 {
957 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
958 /* initialize carry over */
959 if (current_cf == CF_AVERAGE) {
960 if (isnan(pdp_temp[ii])) {
961 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
962 } else {
963 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
964 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
965 }
966 } else {
967 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
968 }
969 } else {
970 rrd_value_t cum_val, cur_val;
971 switch (current_cf) {
972 case CF_AVERAGE:
973 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
974 cur_val = IFDNAN(pdp_temp[ii],0.0);
975 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
976 (cum_val + cur_val * start_pdp_offset) /
977 (rrd.rra_def[i].pdp_cnt
978 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
979 /* initialize carry over value */
980 if (isnan(pdp_temp[ii])) {
981 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
982 } else {
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
984 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
985 }
986 break;
987 case CF_MAXIMUM:
988 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
989 cur_val = IFDNAN(pdp_temp[ii],-DINF);
990 #ifdef DEBUG
991 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
992 isnan(pdp_temp[ii])) {
993 fprintf(stderr,
994 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
995 i,ii);
996 exit(-1);
997 }
998 #endif
999 if (cur_val > cum_val)
1000 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1001 else
1002 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1003 /* initialize carry over value */
1004 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1005 break;
1006 case CF_MINIMUM:
1007 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1008 cur_val = IFDNAN(pdp_temp[ii],DINF);
1009 #ifdef DEBUG
1010 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1011 isnan(pdp_temp[ii])) {
1012 fprintf(stderr,
1013 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1014 i,ii);
1015 exit(-1);
1016 }
1017 #endif
1018 if (cur_val < cum_val)
1019 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1020 else
1021 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1022 /* initialize carry over value */
1023 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1024 break;
1025 case CF_LAST:
1026 default:
1027 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1028 /* initialize carry over value */
1029 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1030 break;
1031 }
1032 } /* endif meets xff value requirement for a valid value */
1033 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1034 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1035 if (isnan(pdp_temp[ii]))
1036 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1037 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1038 else
1039 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1040 } else /* rra_step_cnt[i] == 0 */
1041 {
1042 #ifdef DEBUG
1043 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1044 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1045 i,ii);
1046 } else {
1047 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1048 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1049 }
1050 #endif
1051 if (isnan(pdp_temp[ii])) {
1052 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1053 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1054 {
1055 if (current_cf == CF_AVERAGE) {
1056 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1057 elapsed_pdp_st;
1058 } else {
1059 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1060 }
1061 #ifdef DEBUG
1062 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1063 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1064 #endif
1065 } else {
1066 switch (current_cf) {
1067 case CF_AVERAGE:
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1069 elapsed_pdp_st;
1070 break;
1071 case CF_MINIMUM:
1072 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1073 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1074 break;
1075 case CF_MAXIMUM:
1076 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078 break;
1079 case CF_LAST:
1080 default:
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082 break;
1083 }
1084 }
1085 }
1086 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1087 if (elapsed_pdp_st > 2)
1088 {
1089 switch (current_cf) {
1090 case CF_AVERAGE:
1091 default:
1092 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1093 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1094 break;
1095 case CF_SEASONAL:
1096 case CF_DEVSEASONAL:
1097 /* need to update cached seasonal values, so they are consistent
1098 * with the bulk update */
1099 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1100 * CDP_last_deviation are the same. */
1101 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1102 last_seasonal_coef[ii];
1103 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1104 seasonal_coef[ii];
1105 break;
1106 case CF_HWPREDICT:
1107 /* need to update the null_count and last_null_count.
1108 * even do this for non-DNAN pdp_temp because the
1109 * algorithm is not learning from batch updates. */
1110 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1111 elapsed_pdp_st;
1112 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1113 elapsed_pdp_st - 1;
1114 /* fall through */
1115 case CF_DEVPREDICT:
1116 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1117 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1118 break;
1119 case CF_FAILURES:
1120 /* do not count missed bulk values as failures */
1121 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1122 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1123 /* need to reset violations buffer.
1124 * could do this more carefully, but for now, just
1125 * assume a bulk update wipes away all violations. */
1126 erase_violations(&rrd, iii, i);
1127 break;
1128 }
1129 }
1130 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1132 if (rrd_test_error()) break;
1134 } /* endif data sources loop */
1135 } /* end RRA Loop */
1137 /* this loop is only entered if elapsed_pdp_st < 3 */
1138 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1139 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1140 {
1141 for(i = 0, rra_start = rra_begin;
1142 i < rrd.stat_head->rra_cnt;
1143 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1144 i++)
1145 {
1146 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1148 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1149 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1150 {
1151 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1152 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1153 &seasonal_coef);
1154 rra_current = ftell(rrd_file);
1155 }
1156 if (rrd_test_error()) break;
1157 /* loop over data soures within each RRA */
1158 for(ii = 0;
1159 ii < rrd.stat_head->ds_cnt;
1160 ii++)
1161 {
1162 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1163 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1164 scratch_idx, seasonal_coef);
1165 }
1166 } /* end RRA Loop */
1167 if (rrd_test_error()) break;
1168 } /* end elapsed_pdp_st loop */
1170 if (rrd_test_error()) break;
1172 /* Ready to write to disk */
1173 /* Move sequentially through the file, writing one RRA at a time.
1174 * Note this architecture divorces the computation of CDP with
1175 * flushing updated RRA entries to disk. */
1176 for(i = 0, rra_start = rra_begin;
1177 i < rrd.stat_head->rra_cnt;
1178 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1179 i++) {
1180 /* is there anything to write for this RRA? If not, continue. */
1181 if (rra_step_cnt[i] == 0) continue;
1183 /* write the first row */
1184 #ifdef DEBUG
1185 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1186 #endif
1187 rrd.rra_ptr[i].cur_row++;
1188 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1189 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1190 /* positition on the first row */
1191 rra_pos_tmp = rra_start +
1192 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1193 if(rra_pos_tmp != rra_current) {
1194 #ifndef HAVE_MMAP
1195 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1196 rrd_set_error("seek error in rrd");
1197 break;
1198 }
1199 #endif
1200 rra_current = rra_pos_tmp;
1201 }
1203 #ifdef DEBUG
1204 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1205 #endif
1206 scratch_idx = CDP_primary_val;
1207 if (pcdp_summary != NULL)
1208 {
1209 rra_time = (current_time - current_time
1210 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1211 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1212 }
1213 #ifdef HAVE_MMAP
1214 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1215 pcdp_summary, &rra_time, rrd_mmaped_file);
1216 #else
1217 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1218 pcdp_summary, &rra_time);
1219 #endif
1220 if (rrd_test_error()) break;
1222 /* write other rows of the bulk update, if any */
1223 scratch_idx = CDP_secondary_val;
1224 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1225 {
1226 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1227 {
1228 #ifdef DEBUG
1229 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1230 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1231 #endif
1232 /* wrap */
1233 rrd.rra_ptr[i].cur_row = 0;
1234 /* seek back to beginning of current rra */
1235 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1236 {
1237 rrd_set_error("seek error in rrd");
1238 break;
1239 }
1240 #ifdef DEBUG
1241 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1242 #endif
1243 rra_current = rra_start;
1244 }
1245 if (pcdp_summary != NULL)
1246 {
1247 rra_time = (current_time - current_time
1248 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1249 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1250 }
1251 #ifdef HAVE_MMAP
1252 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1253 pcdp_summary, &rra_time, rrd_mmaped_file);
1254 #else
1255 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1256 pcdp_summary, &rra_time);
1257 #endif
1258 }
1260 if (rrd_test_error())
1261 break;
1262 } /* RRA LOOP */
1264 /* break out of the argument parsing loop if error_string is set */
1265 if (rrd_test_error()){
1266 free(step_start);
1267 break;
1268 }
1270 } /* endif a pdp_st has occurred */
1271 rrd.live_head->last_up = current_time;
1272 rrd.live_head->last_up_usec = current_time_usec;
1273 free(step_start);
1274 } /* function argument loop */
1276 if (seasonal_coef != NULL) free(seasonal_coef);
1277 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1278 if (rra_step_cnt != NULL) free(rra_step_cnt);
1279 rpnstack_free(&rpnstack);
1281 #ifdef HAVE_MMAP
1282 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1283 rrd_set_error("error writing(unmapping) file: %s", filename);
1284 }
1285 #endif
1286 /* if we got here and if there is an error and if the file has not been
1287 * written to, then close things up and return. */
1288 if (rrd_test_error()) {
1289 free(updvals);
1290 free(tmpl_idx);
1291 rrd_free(&rrd);
1292 free(pdp_temp);
1293 free(pdp_new);
1294 fclose(rrd_file);
1295 return(-1);
1296 }
1298 /* aargh ... that was tough ... so many loops ... anyway, its done.
1299 * we just need to write back the live header portion now*/
1301 if (fseek(rrd_file, (sizeof(stat_head_t)
1302 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1303 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1304 SEEK_SET) != 0) {
1305 rrd_set_error("seek rrd for live header writeback");
1306 free(updvals);
1307 free(tmpl_idx);
1308 rrd_free(&rrd);
1309 free(pdp_temp);
1310 free(pdp_new);
1311 fclose(rrd_file);
1312 return(-1);
1313 }
1315 if(version >= 3) {
1316 if(fwrite( rrd.live_head,
1317 sizeof(live_head_t), 1, rrd_file) != 1){
1318 rrd_set_error("fwrite live_head to rrd");
1319 free(updvals);
1320 rrd_free(&rrd);
1321 free(tmpl_idx);
1322 free(pdp_temp);
1323 free(pdp_new);
1324 fclose(rrd_file);
1325 return(-1);
1326 }
1327 }
1328 else {
1329 if(fwrite( &rrd.live_head->last_up,
1330 sizeof(time_t), 1, rrd_file) != 1){
1331 rrd_set_error("fwrite live_head to rrd");
1332 free(updvals);
1333 rrd_free(&rrd);
1334 free(tmpl_idx);
1335 free(pdp_temp);
1336 free(pdp_new);
1337 fclose(rrd_file);
1338 return(-1);
1339 }
1340 }
1343 if(fwrite( rrd.pdp_prep,
1344 sizeof(pdp_prep_t),
1345 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1346 rrd_set_error("ftwrite pdp_prep to rrd");
1347 free(updvals);
1348 rrd_free(&rrd);
1349 free(tmpl_idx);
1350 free(pdp_temp);
1351 free(pdp_new);
1352 fclose(rrd_file);
1353 return(-1);
1354 }
1356 if(fwrite( rrd.cdp_prep,
1357 sizeof(cdp_prep_t),
1358 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1359 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1361 rrd_set_error("ftwrite cdp_prep to rrd");
1362 free(updvals);
1363 free(tmpl_idx);
1364 rrd_free(&rrd);
1365 free(pdp_temp);
1366 free(pdp_new);
1367 fclose(rrd_file);
1368 return(-1);
1369 }
1371 if(fwrite( rrd.rra_ptr,
1372 sizeof(rra_ptr_t),
1373 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1374 rrd_set_error("fwrite rra_ptr to rrd");
1375 free(updvals);
1376 free(tmpl_idx);
1377 rrd_free(&rrd);
1378 free(pdp_temp);
1379 free(pdp_new);
1380 fclose(rrd_file);
1381 return(-1);
1382 }
1384 /* OK now close the files and free the memory */
1385 if(fclose(rrd_file) != 0){
1386 rrd_set_error("closing rrd");
1387 free(updvals);
1388 free(tmpl_idx);
1389 rrd_free(&rrd);
1390 free(pdp_temp);
1391 free(pdp_new);
1392 return(-1);
1393 }
1395 /* calling the smoothing code here guarantees at most
1396 * one smoothing operation per rrd_update call. Unfortunately,
1397 * it is possible with bulk updates, or a long-delayed update
1398 * for smoothing to occur off-schedule. This really isn't
1399 * critical except during the burning cycles. */
1400 if (schedule_smooth)
1401 {
1402 rrd_file = fopen(filename,"rb+");
1403 rra_start = rra_begin;
1404 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1405 {
1406 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1407 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1408 {
1409 #ifdef DEBUG
1410 fprintf(stderr,"Running smoother for rra %ld\n",i);
1411 #endif
1412 apply_smoother(&rrd,i,rra_start,rrd_file);
1413 if (rrd_test_error())
1414 break;
1415 }
1416 rra_start += rrd.rra_def[i].row_cnt
1417 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1418 }
1419 fclose(rrd_file);
1420 }
1421 rrd_free(&rrd);
1422 free(updvals);
1423 free(tmpl_idx);
1424 free(pdp_new);
1425 free(pdp_temp);
1426 return(0);
1427 }
1429 /*
1430 * get exclusive lock to whole file.
1431 * lock gets removed when we close the file
1432 *
1433 * returns 0 on success
1434 */
1435 int
1436 LockRRD(FILE *rrdfile)
1437 {
1438 int rrd_fd; /* File descriptor for RRD */
1439 int rcstat;
1441 rrd_fd = fileno(rrdfile);
1443 {
1444 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1445 struct _stat st;
1447 if ( _fstat( rrd_fd, &st ) == 0 ) {
1448 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1449 } else {
1450 rcstat = -1;
1451 }
1452 #else
1453 struct flock lock;
1454 lock.l_type = F_WRLCK; /* exclusive write lock */
1455 lock.l_len = 0; /* whole file */
1456 lock.l_start = 0; /* start of file */
1457 lock.l_whence = SEEK_SET; /* end of file */
1459 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1460 #endif
1461 }
1463 return(rcstat);
1464 }
1467 #ifdef HAVE_MMAP
1468 info_t
1469 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1470 unsigned short CDP_scratch_idx,
1471 #ifndef DEBUG
1472 FILE UNUSED(*rrd_file),
1473 #else
1474 FILE *rrd_file,
1475 #endif
1476 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1477 #else
1478 info_t
1479 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1480 unsigned short CDP_scratch_idx, FILE *rrd_file,
1481 info_t *pcdp_summary, time_t *rra_time)
1482 #endif
1483 {
1484 unsigned long ds_idx, cdp_idx;
1485 infoval iv;
1487 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1488 {
1489 /* compute the cdp index */
1490 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1491 #ifdef DEBUG
1492 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1493 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1494 rrd -> rra_def[rra_idx].cf_nam);
1495 #endif
1496 if (pcdp_summary != NULL)
1497 {
1498 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1499 /* append info to the return hash */
1500 pcdp_summary = info_push(pcdp_summary,
1501 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1502 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1503 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1504 RD_I_VAL, iv);
1505 }
1506 #ifdef HAVE_MMAP
1507 memcpy((char *)rrd_mmaped_file + *rra_current,
1508 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1509 sizeof(rrd_value_t));
1510 #else
1511 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1512 sizeof(rrd_value_t),1,rrd_file) != 1)
1513 {
1514 rrd_set_error("writing rrd");
1515 return 0;
1516 }
1517 #endif
1518 *rra_current += sizeof(rrd_value_t);
1519 }
1520 return (pcdp_summary);
1521 }