1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normilize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static void normalize_time(struct timeval *t)
65 {
66 if(t->tv_usec < 0) {
67 t->tv_sec--;
68 t->tv_usec += 1000000L;
69 }
70 }
72 /* Local prototypes */
73 int LockRRD(int in_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 int UNUSED(in_file),
80 #else
81 int in_file,
82 #endif
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, int in_file,
88 info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
91 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
92 info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99 char *tmplt = NULL;
100 info_t *result = NULL;
101 infoval rc;
102 rc.u_int = -1;
103 optind = 0; opterr = 0; /* initialize getopt */
105 while (1) {
106 static struct option long_options[] =
107 {
108 {"template", required_argument, 0, 't'},
109 {0,0,0,0}
110 };
111 int option_index = 0;
112 int opt;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
116 if (opt == EOF)
117 break;
119 switch(opt) {
120 case 't':
121 tmplt = optarg;
122 break;
124 case '?':
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
126 goto end_tag;
127 }
128 }
130 /* need at least 2 arguments: filename, data. */
131 if (argc-optind < 2) {
132 rrd_set_error("Not enough arguments");
133 goto end_tag;
134 }
135 rc.u_int = 0;
136 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137 rc.u_int = _rrd_update(argv[optind], tmplt,
138 argc - optind - 1, (const char **)(argv + optind + 1), result);
139 result->value.u_int = rc.u_int;
140 end_tag:
141 return result;
142 }
144 int
145 rrd_update(int argc, char **argv)
146 {
147 char *tmplt = NULL;
148 int rc;
149 optind = 0; opterr = 0; /* initialize getopt */
151 while (1) {
152 static struct option long_options[] =
153 {
154 {"template", required_argument, 0, 't'},
155 {0,0,0,0}
156 };
157 int option_index = 0;
158 int opt;
159 opt = getopt_long(argc, argv, "t:",
160 long_options, &option_index);
162 if (opt == EOF)
163 break;
165 switch(opt) {
166 case 't':
167 tmplt = optarg;
168 break;
170 case '?':
171 rrd_set_error("unknown option '%s'",argv[optind-1]);
172 return(-1);
173 }
174 }
176 /* need at least 2 arguments: filename, data. */
177 if (argc-optind < 2) {
178 rrd_set_error("Not enough arguments");
180 return -1;
181 }
183 rc = rrd_update_r(argv[optind], tmplt,
184 argc - optind - 1, (const char **)(argv + optind + 1));
185 return rc;
186 }
188 int
189 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
190 {
191 return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
194 int
195 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
196 info_t *pcdp_summary)
197 {
199 int arg_i = 2;
200 short j;
201 unsigned long i,ii,iii=1;
203 unsigned long rra_begin; /* byte pointer to the rra
204 * area in the rrd file. this
205 * pointer never changes value */
206 unsigned long rra_start; /* byte pointer to the rra
207 * area in the rrd file. this
208 * pointer changes as each rrd is
209 * processed. */
210 unsigned long rra_current; /* byte pointer to the current write
211 * spot in the rrd file. */
212 unsigned long rra_pos_tmp; /* temporary byte pointer. */
213 double interval,
214 pre_int,post_int; /* interval between this and
215 * the last run */
216 unsigned long proc_pdp_st; /* which pdp_st was the last
217 * to be processed */
218 unsigned long occu_pdp_st; /* when was the pdp_st
219 * before the last update
220 * time */
221 unsigned long proc_pdp_age; /* how old was the data in
222 * the pdp prep area when it
223 * was last updated */
224 unsigned long occu_pdp_age; /* how long ago was the last
225 * pdp_step time */
226 rrd_value_t *pdp_new; /* prepare the incoming data
227 * to be added the the
228 * existing entry */
229 rrd_value_t *pdp_temp; /* prepare the pdp values
230 * to be added the the
231 * cdp values */
233 long *tmpl_idx; /* index representing the settings
234 transported by the tmplt index */
235 unsigned long tmpl_cnt = 2; /* time and data */
237 rrd_t rrd;
238 time_t current_time = 0;
239 time_t rra_time = 0; /* time of update for a RRA */
240 unsigned long current_time_usec=0;/* microseconds part of current time */
241 struct timeval tmp_time; /* used for time conversion */
243 char **updvals;
244 int schedule_smooth = 0;
245 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
246 /* a vector of future Holt-Winters seasonal coefs */
247 unsigned long elapsed_pdp_st;
248 /* number of elapsed PDP steps since last update */
249 unsigned long *rra_step_cnt = NULL;
250 /* number of rows to be updated in an RRA for a data
251 * value. */
252 unsigned long start_pdp_offset;
253 /* number of PDP steps since the last update that
254 * are assigned to the first CDP to be generated
255 * since the last update. */
256 unsigned short scratch_idx;
257 /* index into the CDP scratch array */
258 enum cf_en current_cf;
259 /* numeric id of the current consolidation function */
260 rpnstack_t rpnstack; /* used for COMPUTE DS */
261 int version; /* rrd version */
262 char *endptr; /* used in the conversion */
263 rrd_file_t* rrd_file;
265 rpnstack_init(&rpnstack);
267 /* need at least 1 arguments: data. */
268 if (argc < 1) {
269 rrd_set_error("Not enough arguments");
270 return -1;
271 }
273 rrd_file = rrd_open(filename,&rrd, RRD_READWRITE);
274 if (rrd_file == NULL) {
275 return -1;
276 }
278 /* initialize time */
279 version = atoi(rrd.stat_head->version);
280 gettimeofday(&tmp_time, 0);
281 normalize_time(&tmp_time);
282 current_time = tmp_time.tv_sec;
283 if(version >= 3) {
284 current_time_usec = tmp_time.tv_usec;
285 }
286 else {
287 current_time_usec = 0;
288 }
290 rra_current = rra_start = rra_begin = rrd_file->header_len;
291 /* This is defined in the ANSI C standard, section 7.9.5.3:
293 When a file is opened with udpate mode ('+' as the second
294 or third character in the ... list of mode argument
295 variables), both input and output may be performed on the
296 associated stream. However, ... input may not be directly
297 followed by output without an intervening call to a file
298 positioning function, unless the input operation encounters
299 end-of-file. */
300 #if 0//def HAVE_MMAP
301 rrd_filesize = rrd_file->file_size;
302 fseek(rrd_file->fd, 0, SEEK_END);
303 rrd_filesize = ftell(rrd_file->fd);
304 fseek(rrd_file->fd, rra_current, SEEK_SET);
305 #else
306 // fseek(rrd_file->fd, 0, SEEK_CUR);
307 #endif
310 /* get exclusive lock to whole file.
311 * lock gets removed when we close the file.
312 */
313 if (LockRRD(rrd_file->fd) != 0) {
314 rrd_set_error("could not lock RRD");
315 rrd_free(&rrd);
316 close(rrd_file->fd);
317 return(-1);
318 }
320 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
321 rrd_set_error("allocating updvals pointer array");
322 rrd_free(&rrd);
323 close(rrd_file->fd);
324 return(-1);
325 }
327 if ((pdp_temp = malloc(sizeof(rrd_value_t)
328 *rrd.stat_head->ds_cnt))==NULL){
329 rrd_set_error("allocating pdp_temp ...");
330 free(updvals);
331 rrd_free(&rrd);
332 close(rrd_file->fd);
333 return(-1);
334 }
336 if ((tmpl_idx = malloc(sizeof(unsigned long)
337 *(rrd.stat_head->ds_cnt+1)))==NULL){
338 rrd_set_error("allocating tmpl_idx ...");
339 free(pdp_temp);
340 free(updvals);
341 rrd_free(&rrd);
342 close(rrd_file->fd);
343 return(-1);
344 }
345 /* initialize tmplt redirector */
346 /* default config example (assume DS 1 is a CDEF DS)
347 tmpl_idx[0] -> 0; (time)
348 tmpl_idx[1] -> 1; (DS 0)
349 tmpl_idx[2] -> 3; (DS 2)
350 tmpl_idx[3] -> 4; (DS 3) */
351 tmpl_idx[0] = 0; /* time */
352 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
353 {
354 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
355 tmpl_idx[ii++]=i;
356 }
357 tmpl_cnt= ii;
359 if (tmplt) {
360 /* we should work on a writeable copy here */
361 char *dsname;
362 unsigned int tmpl_len;
363 char *tmplt_copy = strdup(tmplt);
364 dsname = tmplt_copy;
365 tmpl_cnt = 1; /* the first entry is the time */
366 tmpl_len = strlen(tmplt_copy);
367 for(i=0;i<=tmpl_len ;i++) {
368 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
369 tmplt_copy[i] = '\0';
370 if (tmpl_cnt>rrd.stat_head->ds_cnt){
371 rrd_set_error("tmplt contains more DS definitions than RRD");
372 free(updvals); free(pdp_temp);
373 free(tmpl_idx); rrd_free(&rrd);
374 close(rrd_file->fd); return(-1);
375 }
376 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
377 rrd_set_error("unknown DS name '%s'",dsname);
378 free(updvals); free(pdp_temp);
379 free(tmplt_copy);
380 free(tmpl_idx); rrd_free(&rrd);
381 close(rrd_file->fd); return(-1);
382 } else {
383 /* the first element is always the time */
384 tmpl_idx[tmpl_cnt-1]++;
385 /* go to the next entry on the tmplt_copy */
386 dsname = &tmplt_copy[i+1];
387 /* fix the damage we did before */
388 if (i<tmpl_len) {
389 tmplt_copy[i]=':';
390 }
392 }
393 }
394 }
395 free(tmplt_copy);
396 }
397 if ((pdp_new = malloc(sizeof(rrd_value_t)
398 *rrd.stat_head->ds_cnt))==NULL){
399 rrd_set_error("allocating pdp_new ...");
400 free(updvals);
401 free(pdp_temp);
402 free(tmpl_idx);
403 rrd_free(&rrd);
404 close(rrd_file->fd);
405 return(-1);
406 }
408 #if 0//def HAVE_MMAP
409 rrd_mmaped_file = mmap(0,
410 rrd_file->file_len,
411 PROT_READ | PROT_WRITE,
412 MAP_SHARED,
413 fileno(in_file),
414 0);
415 if (rrd_mmaped_file == MAP_FAILED) {
416 rrd_set_error("error mmapping file %s", filename);
417 free(updvals);
418 free(pdp_temp);
419 free(tmpl_idx);
420 rrd_free(&rrd);
421 close(rrd_file->fd);
422 return(-1);
423 }
424 #ifdef USE_MADVISE
425 /* when we use mmaping we tell the kernel the mmap equivalent
426 of POSIX_FADV_RANDOM */
427 madvise(rrd_mmaped_file,rrd_filesize,POSIX_MADV_RANDOM);
428 #endif
429 #endif
430 /* loop through the arguments. */
431 for(arg_i=0; arg_i<argc;arg_i++) {
432 char *stepper = strdup(argv[arg_i]);
433 char *step_start = stepper;
434 char *p;
435 char *parsetime_error = NULL;
436 enum {atstyle, normal} timesyntax;
437 struct rrd_time_value ds_tv;
438 if (stepper == NULL){
439 rrd_set_error("failed duplication argv entry");
440 free(step_start);
441 free(updvals);
442 free(pdp_temp);
443 free(tmpl_idx);
444 rrd_free(&rrd);
445 #ifdef HAVE_MMAP
446 rrd_close(rrd_file);
447 #endif
448 close(rrd_file->fd);
449 return(-1);
450 }
451 /* initialize all ds input to unknown except the first one
452 which has always got to be set */
453 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
454 updvals[0]=stepper;
455 /* separate all ds elements; first must be examined separately
456 due to alternate time syntax */
457 if ((p=strchr(stepper,'@'))!=NULL) {
458 timesyntax = atstyle;
459 *p = '\0';
460 stepper = p+1;
461 } else if ((p=strchr(stepper,':'))!=NULL) {
462 timesyntax = normal;
463 *p = '\0';
464 stepper = p+1;
465 } else {
466 rrd_set_error("expected timestamp not found in data source from %s",
467 argv[arg_i]);
468 free(step_start);
469 break;
470 }
471 ii=1;
472 updvals[tmpl_idx[ii]] = stepper;
473 while (*stepper) {
474 if (*stepper == ':') {
475 *stepper = '\0';
476 ii++;
477 if (ii<tmpl_cnt){
478 updvals[tmpl_idx[ii]] = stepper+1;
479 }
480 }
481 stepper++;
482 }
484 if (ii != tmpl_cnt-1) {
485 rrd_set_error("expected %lu data source readings (got %lu) from %s",
486 tmpl_cnt-1, ii, argv[arg_i]);
487 free(step_start);
488 break;
489 }
491 /* get the time from the reading ... handle N */
492 if (timesyntax == atstyle) {
493 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
494 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
495 free(step_start);
496 break;
497 }
498 if (ds_tv.type == RELATIVE_TO_END_TIME ||
499 ds_tv.type == RELATIVE_TO_START_TIME) {
500 rrd_set_error("specifying time relative to the 'start' "
501 "or 'end' makes no sense here: %s",
502 updvals[0]);
503 free(step_start);
504 break;
505 }
507 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
508 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
510 } else if (strcmp(updvals[0],"N")==0){
511 gettimeofday(&tmp_time, 0);
512 normalize_time(&tmp_time);
513 current_time = tmp_time.tv_sec;
514 current_time_usec = tmp_time.tv_usec;
515 } else {
516 double tmp;
517 tmp = strtod(updvals[0], 0);
518 current_time = floor(tmp);
519 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
520 }
521 /* dont do any correction for old version RRDs */
522 if(version < 3)
523 current_time_usec = 0;
525 if(current_time < rrd.live_head->last_up ||
526 (current_time == rrd.live_head->last_up &&
527 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
528 rrd_set_error("illegal attempt to update using time %ld when "
529 "last update time is %ld (minimum one second step)",
530 current_time, rrd.live_head->last_up);
531 free(step_start);
532 break;
533 }
536 /* seek to the beginning of the rra's */
537 if (rra_current != rra_begin) {
538 #ifndef HAVE_MMAP
539 if(rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
540 rrd_set_error("seek error in rrd");
541 free(step_start);
542 break;
543 }
544 #endif
545 rra_current = rra_begin;
546 }
547 rra_start = rra_begin;
549 /* when was the current pdp started */
550 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
551 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
553 /* when did the last pdp_st occur */
554 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
555 occu_pdp_st = current_time - occu_pdp_age;
557 /* interval = current_time - rrd.live_head->last_up; */
558 interval = (double)(current_time - rrd.live_head->last_up)
559 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
561 if (occu_pdp_st > proc_pdp_st){
562 /* OK we passed the pdp_st moment*/
563 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
564 * occurred before the latest
565 * pdp_st moment*/
566 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
567 post_int = occu_pdp_age; /* how much after it */
568 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
569 } else {
570 pre_int = interval;
571 post_int = 0;
572 }
574 #ifdef DEBUG
575 printf(
576 "proc_pdp_age %lu\t"
577 "proc_pdp_st %lu\t"
578 "occu_pfp_age %lu\t"
579 "occu_pdp_st %lu\t"
580 "int %lf\t"
581 "pre_int %lf\t"
582 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
583 occu_pdp_age, occu_pdp_st,
584 interval, pre_int, post_int);
585 #endif
587 /* process the data sources and update the pdp_prep
588 * area accordingly */
589 for(i=0;i<rrd.stat_head->ds_cnt;i++){
590 enum dst_en dst_idx;
591 dst_idx= dst_conv(rrd.ds_def[i].dst);
593 /* make sure we do not build diffs with old last_ds values */
594 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
595 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
596 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
597 }
599 /* NOTE: DST_CDEF should never enter this if block, because
600 * updvals[i+1][0] is initialized to 'U'; unless the caller
601 * accidently specified a value for the DST_CDEF. To handle
602 * this case, an extra check is required. */
604 if((updvals[i+1][0] != 'U') &&
605 (dst_idx != DST_CDEF) &&
606 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
607 double rate = DNAN;
608 /* the data source type defines how to process the data */
609 /* pdp_new contains rate * time ... eg the bytes
610 * transferred during the interval. Doing it this way saves
611 * a lot of math operations */
614 switch(dst_idx){
615 case DST_COUNTER:
616 case DST_DERIVE:
617 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
618 for(ii=0;updvals[i+1][ii] != '\0';ii++){
619 if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
620 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
621 break;
622 }
623 }
624 if (rrd_test_error()){
625 break;
626 }
627 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
628 if(dst_idx == DST_COUNTER) {
629 /* simple overflow catcher suggested by Andres Kroonmaa */
630 /* this will fail terribly for non 32 or 64 bit counters ... */
631 /* are there any others in SNMP land ? */
632 if (pdp_new[i] < (double)0.0 )
633 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
634 if (pdp_new[i] < (double)0.0 )
635 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
636 }
637 rate = pdp_new[i] / interval;
638 }
639 else {
640 pdp_new[i]= DNAN;
641 }
642 break;
643 case DST_ABSOLUTE:
644 errno = 0;
645 pdp_new[i] = strtod(updvals[i+1],&endptr);
646 if (errno > 0){
647 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
648 break;
649 };
650 if (endptr[0] != '\0'){
651 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
652 break;
653 }
654 rate = pdp_new[i] / interval;
655 break;
656 case DST_GAUGE:
657 errno = 0;
658 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
659 if (errno > 0){
660 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
661 break;
662 };
663 if (endptr[0] != '\0'){
664 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
665 break;
666 }
667 rate = pdp_new[i] / interval;
668 break;
669 default:
670 rrd_set_error("rrd contains unknown DS type : '%s'",
671 rrd.ds_def[i].dst);
672 break;
673 }
674 /* break out of this for loop if the error string is set */
675 if (rrd_test_error()){
676 break;
677 }
678 /* make sure pdp_temp is neither too large or too small
679 * if any of these occur it becomes unknown ...
680 * sorry folks ... */
681 if ( ! isnan(rate) &&
682 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
683 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
684 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
685 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
686 pdp_new[i] = DNAN;
687 }
688 } else {
689 /* no news is news all the same */
690 pdp_new[i] = DNAN;
691 }
694 /* make a copy of the command line argument for the next run */
695 #ifdef DEBUG
696 fprintf(stderr,
697 "prep ds[%lu]\t"
698 "last_arg '%s'\t"
699 "this_arg '%s'\t"
700 "pdp_new %10.2f\n",
701 i,
702 rrd.pdp_prep[i].last_ds,
703 updvals[i+1], pdp_new[i]);
704 #endif
705 strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
706 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
707 }
708 /* break out of the argument parsing loop if the error_string is set */
709 if (rrd_test_error()){
710 free(step_start);
711 break;
712 }
713 /* has a pdp_st moment occurred since the last run ? */
715 if (proc_pdp_st == occu_pdp_st){
716 /* no we have not passed a pdp_st moment. therefore update is simple */
718 for(i=0;i<rrd.stat_head->ds_cnt;i++){
719 if(isnan(pdp_new[i])) {
720 /* this is not realy accurate if we use subsecond data arival time
721 should have thought of it when going subsecond resolution ...
722 sorry next format change we will have it! */
723 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
724 } else {
725 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
726 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
727 } else {
728 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
729 }
730 }
731 #ifdef DEBUG
732 fprintf(stderr,
733 "NO PDP ds[%lu]\t"
734 "value %10.2f\t"
735 "unkn_sec %5lu\n",
736 i,
737 rrd.pdp_prep[i].scratch[PDP_val].u_val,
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 #endif
740 }
741 } else {
742 /* an pdp_st has occurred. */
744 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
745 * occurred up to the last run.
746 pdp_new[] contains rate*seconds from the latest run.
747 pdp_temp[] will contain the rate for cdp */
749 for(i=0;i<rrd.stat_head->ds_cnt;i++){
750 /* update pdp_prep to the current pdp_st. */
751 double pre_unknown = 0.0;
752 if(isnan(pdp_new[i]))
753 /* a final bit of unkonwn to be added bevore calculation
754 * we use a tempaorary variable for this so that we
755 * don't have to turn integer lines before using the value */
756 pre_unknown = pre_int;
757 else {
758 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
759 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
760 } else {
761 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
762 }
763 }
766 /* if too much of the pdp_prep is unknown we dump it */
767 if (
768 /* removed because this does not agree with the definition
769 a heart beat can be unknown */
770 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
771 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
772 /* if the interval is larger thatn mrhb we get NAN */
773 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
774 (occu_pdp_st-proc_pdp_st <=
775 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
776 pdp_temp[i] = DNAN;
777 } else {
778 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
779 / ((double)(occu_pdp_st - proc_pdp_st
780 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
781 -pre_unknown);
782 }
784 /* process CDEF data sources; remember each CDEF DS can
785 * only reference other DS with a lower index number */
786 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
787 rpnp_t *rpnp;
788 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
789 /* substitue data values for OP_VARIABLE nodes */
790 for (ii = 0; rpnp[ii].op != OP_END; ii++)
791 {
792 if (rpnp[ii].op == OP_VARIABLE) {
793 rpnp[ii].op = OP_NUMBER;
794 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
795 }
796 }
797 /* run the rpn calculator */
798 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
799 free(rpnp);
800 break; /* exits the data sources pdp_temp loop */
801 }
802 }
804 /* make pdp_prep ready for the next run */
805 if(isnan(pdp_new[i])){
806 /* this is not realy accurate if we use subsecond data arival time
807 should have thought of it when going subsecond resolution ...
808 sorry next format change we will have it! */
809 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
810 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
811 } else {
812 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
813 rrd.pdp_prep[i].scratch[PDP_val].u_val =
814 pdp_new[i]/interval*post_int;
815 }
817 #ifdef DEBUG
818 fprintf(stderr,
819 "PDP UPD ds[%lu]\t"
820 "pdp_temp %10.2f\t"
821 "new_prep %10.2f\t"
822 "new_unkn_sec %5lu\n",
823 i, pdp_temp[i],
824 rrd.pdp_prep[i].scratch[PDP_val].u_val,
825 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
826 #endif
827 }
829 /* if there were errors during the last loop, bail out here */
830 if (rrd_test_error()){
831 free(step_start);
832 break;
833 }
835 /* compute the number of elapsed pdp_st moments */
836 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
837 #ifdef DEBUG
838 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
839 #endif
840 if (rra_step_cnt == NULL)
841 {
842 rra_step_cnt = (unsigned long *)
843 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
844 }
846 for(i = 0, rra_start = rra_begin;
847 i < rrd.stat_head->rra_cnt;
848 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
849 i++)
850 {
851 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
852 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
853 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
854 if (start_pdp_offset <= elapsed_pdp_st) {
855 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
856 rrd.rra_def[i].pdp_cnt + 1;
857 } else {
858 rra_step_cnt[i] = 0;
859 }
861 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
862 {
863 /* If this is a bulk update, we need to skip ahead in the seasonal
864 * arrays so that they will be correct for the next observed value;
865 * note that for the bulk update itself, no update will occur to
866 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
867 * be set to DNAN. */
868 if (rra_step_cnt[i] > 2)
869 {
870 /* skip update by resetting rra_step_cnt[i],
871 * note that this is not data source specific; this is due
872 * to the bulk update, not a DNAN value for the specific data
873 * source. */
874 rra_step_cnt[i] = 0;
875 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
876 &last_seasonal_coef);
877 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
878 &seasonal_coef);
879 }
881 /* periodically run a smoother for seasonal effects */
882 /* Need to use first cdp parameter buffer to track
883 * burnin (burnin requires a specific smoothing schedule).
884 * The CDP_init_seasonal parameter is really an RRA level,
885 * not a data source within RRA level parameter, but the rra_def
886 * is read only for rrd_update (not flushed to disk). */
887 iii = i*(rrd.stat_head -> ds_cnt);
888 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
889 <= BURNIN_CYCLES)
890 {
891 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
892 > rrd.rra_def[i].row_cnt - 1) {
893 /* mark off one of the burnin cycles */
894 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
895 schedule_smooth = 1;
896 }
897 } else {
898 /* someone has no doubt invented a trick to deal with this
899 * wrap around, but at least this code is clear. */
900 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
901 rrd.rra_ptr[i].cur_row)
902 {
903 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
904 * mapping between PDP and CDP */
905 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
906 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
907 {
908 #ifdef DEBUG
909 fprintf(stderr,
910 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
911 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
912 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
913 #endif
914 schedule_smooth = 1;
915 }
916 } else {
917 /* can't rely on negative numbers because we are working with
918 * unsigned values */
919 /* Don't need modulus here. If we've wrapped more than once, only
920 * one smooth is executed at the end. */
921 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
922 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
923 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
924 {
925 #ifdef DEBUG
926 fprintf(stderr,
927 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
928 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
929 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
930 #endif
931 schedule_smooth = 1;
932 }
933 }
934 }
936 rra_current = rrd_tell(rrd_file);
937 } /* if cf is DEVSEASONAL or SEASONAL */
939 if (rrd_test_error()) break;
941 /* update CDP_PREP areas */
942 /* loop over data soures within each RRA */
943 for(ii = 0;
944 ii < rrd.stat_head->ds_cnt;
945 ii++)
946 {
948 /* iii indexes the CDP prep area for this data source within the RRA */
949 iii=i*rrd.stat_head->ds_cnt+ii;
951 if (rrd.rra_def[i].pdp_cnt > 1) {
953 if (rra_step_cnt[i] > 0) {
954 /* If we are in this block, as least 1 CDP value will be written to
955 * disk, this is the CDP_primary_val entry. If more than 1 value needs
956 * to be written, then the "fill in" value is the CDP_secondary_val
957 * entry. */
958 if (isnan(pdp_temp[ii]))
959 {
960 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
961 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
962 } else {
963 /* CDP_secondary value is the RRA "fill in" value for intermediary
964 * CDP data entries. No matter the CF, the value is the same because
965 * the average, max, min, and last of a list of identical values is
966 * the same, namely, the value itself. */
967 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
968 }
970 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
971 > rrd.rra_def[i].pdp_cnt*
972 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
973 {
974 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
975 /* initialize carry over */
976 if (current_cf == CF_AVERAGE) {
977 if (isnan(pdp_temp[ii])) {
978 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
979 } else {
980 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
981 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
982 }
983 } else {
984 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
985 }
986 } else {
987 rrd_value_t cum_val, cur_val;
988 switch (current_cf) {
989 case CF_AVERAGE:
990 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
991 cur_val = IFDNAN(pdp_temp[ii],0.0);
992 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
993 (cum_val + cur_val * start_pdp_offset) /
994 (rrd.rra_def[i].pdp_cnt
995 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
996 /* initialize carry over value */
997 if (isnan(pdp_temp[ii])) {
998 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
999 } else {
1000 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1001 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1002 }
1003 break;
1004 case CF_MAXIMUM:
1005 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1006 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1007 #ifdef DEBUG
1008 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1009 isnan(pdp_temp[ii])) {
1010 fprintf(stderr,
1011 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1012 i,ii);
1013 exit(-1);
1014 }
1015 #endif
1016 if (cur_val > cum_val)
1017 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1018 else
1019 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1020 /* initialize carry over value */
1021 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1022 break;
1023 case CF_MINIMUM:
1024 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1025 cur_val = IFDNAN(pdp_temp[ii],DINF);
1026 #ifdef DEBUG
1027 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1028 isnan(pdp_temp[ii])) {
1029 fprintf(stderr,
1030 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1031 i,ii);
1032 exit(-1);
1033 }
1034 #endif
1035 if (cur_val < cum_val)
1036 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1037 else
1038 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1039 /* initialize carry over value */
1040 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1041 break;
1042 case CF_LAST:
1043 default:
1044 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1045 /* initialize carry over value */
1046 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1047 break;
1048 }
1049 } /* endif meets xff value requirement for a valid value */
1050 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1051 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1052 if (isnan(pdp_temp[ii]))
1053 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1054 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1055 else
1056 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1057 } else /* rra_step_cnt[i] == 0 */
1058 {
1059 #ifdef DEBUG
1060 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1061 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1062 i,ii);
1063 } else {
1064 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1065 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1066 }
1067 #endif
1068 if (isnan(pdp_temp[ii])) {
1069 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1070 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1071 {
1072 if (current_cf == CF_AVERAGE) {
1073 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1074 elapsed_pdp_st;
1075 } else {
1076 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1077 }
1078 #ifdef DEBUG
1079 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1080 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1081 #endif
1082 } else {
1083 switch (current_cf) {
1084 case CF_AVERAGE:
1085 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1086 elapsed_pdp_st;
1087 break;
1088 case CF_MINIMUM:
1089 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1090 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1091 break;
1092 case CF_MAXIMUM:
1093 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1094 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1095 break;
1096 case CF_LAST:
1097 default:
1098 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099 break;
1100 }
1101 }
1102 }
1103 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1104 if (elapsed_pdp_st > 2)
1105 {
1106 switch (current_cf) {
1107 case CF_AVERAGE:
1108 default:
1109 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1110 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1111 break;
1112 case CF_SEASONAL:
1113 case CF_DEVSEASONAL:
1114 /* need to update cached seasonal values, so they are consistent
1115 * with the bulk update */
1116 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1117 * CDP_last_deviation are the same. */
1118 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1119 last_seasonal_coef[ii];
1120 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1121 seasonal_coef[ii];
1122 break;
1123 case CF_HWPREDICT:
1124 /* need to update the null_count and last_null_count.
1125 * even do this for non-DNAN pdp_temp because the
1126 * algorithm is not learning from batch updates. */
1127 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1128 elapsed_pdp_st;
1129 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1130 elapsed_pdp_st - 1;
1131 /* fall through */
1132 case CF_DEVPREDICT:
1133 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1134 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1135 break;
1136 case CF_FAILURES:
1137 /* do not count missed bulk values as failures */
1138 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1139 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1140 /* need to reset violations buffer.
1141 * could do this more carefully, but for now, just
1142 * assume a bulk update wipes away all violations. */
1143 erase_violations(&rrd, iii, i);
1144 break;
1145 }
1146 }
1147 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1149 if (rrd_test_error()) break;
1151 } /* endif data sources loop */
1152 } /* end RRA Loop */
1154 /* this loop is only entered if elapsed_pdp_st < 3 */
1155 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1156 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1157 {
1158 for(i = 0, rra_start = rra_begin;
1159 i < rrd.stat_head->rra_cnt;
1160 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1161 i++)
1162 {
1163 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1165 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1166 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1167 {
1168 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1169 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1170 &seasonal_coef);
1171 rra_current = rrd_tell(rrd_file);
1172 }
1173 if (rrd_test_error()) break;
1174 /* loop over data soures within each RRA */
1175 for(ii = 0;
1176 ii < rrd.stat_head->ds_cnt;
1177 ii++)
1178 {
1179 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1180 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1181 scratch_idx, seasonal_coef);
1182 }
1183 } /* end RRA Loop */
1184 if (rrd_test_error()) break;
1185 } /* end elapsed_pdp_st loop */
1187 if (rrd_test_error()) break;
1189 /* Ready to write to disk */
1190 /* Move sequentially through the file, writing one RRA at a time.
1191 * Note this architecture divorces the computation of CDP with
1192 * flushing updated RRA entries to disk. */
1193 for(i = 0, rra_start = rra_begin;
1194 i < rrd.stat_head->rra_cnt;
1195 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1196 i++) {
1197 /* is th5Aere anything to write for this RRA? If not, continue. */
1198 if (rra_step_cnt[i] == 0) continue;
1200 /* write the first row */
1201 #ifdef DEBUG
1202 fprintf(stderr," -- RRA Preseek %ld\n",rrd_file->pos);
1203 #endif
1204 rrd.rra_ptr[i].cur_row++;
1205 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1206 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1207 /* positition on the first row */
1208 rra_pos_tmp = rra_start +
1209 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1210 if(rra_pos_tmp != rra_current) {
1211 #ifndef HAVE_MMAP
1212 if(rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1213 rrd_set_error("seek error in rrd");
1214 break;
1215 }
1216 #endif
1217 rra_current = rra_pos_tmp;
1218 }
1220 #ifdef DEBUG
1221 fprintf(stderr," -- RRA Postseek %ld\n",rrd_file->pos);
1222 #endif
1223 scratch_idx = CDP_primary_val;
1224 if (pcdp_summary != NULL)
1225 {
1226 rra_time = (current_time - current_time
1227 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1228 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1229 }
1230 #ifdef HAVE_MMAP
1231 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd,
1232 pcdp_summary, &rra_time, rrd_file->file_start);
1233 #else
1234 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd,
1235 pcdp_summary, &rra_time);
1236 #endif
1237 if (rrd_test_error()) break;
1239 /* write other rows of the bulk update, if any */
1240 scratch_idx = CDP_secondary_val;
1241 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1242 {
1243 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1244 {
1245 #ifdef DEBUG
1246 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1247 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1248 #endif
1249 /* wrap */
1250 rrd.rra_ptr[i].cur_row = 0;
1251 /* seek back to beginning of current rra */
1252 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0)
1253 {
1254 rrd_set_error("seek error in rrd");
1255 break;
1256 }
1257 #ifdef DEBUG
1258 fprintf(stderr," -- Wraparound Postseek %ld\n",rrd_file->pos);
1259 #endif
1260 rra_current = rra_start;
1261 }
1262 if (pcdp_summary != NULL)
1263 {
1264 rra_time = (current_time - current_time
1265 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1266 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1267 }
1268 #ifdef HAVE_MMAP
1269 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd,
1270 pcdp_summary, &rra_time, rrd_file->file_start);
1271 #else
1272 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file->fd,
1273 pcdp_summary, &rra_time);
1274 #endif
1275 }
1277 if (rrd_test_error())
1278 break;
1279 } /* RRA LOOP */
1281 /* break out of the argument parsing loop if error_string is set */
1282 if (rrd_test_error()){
1283 free(step_start);
1284 break;
1285 }
1287 } /* endif a pdp_st has occurred */
1288 rrd.live_head->last_up = current_time;
1289 rrd.live_head->last_up_usec = current_time_usec;
1290 free(step_start);
1291 } /* function argument loop */
1293 if (seasonal_coef != NULL) free(seasonal_coef);
1294 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1295 if (rra_step_cnt != NULL) free(rra_step_cnt);
1296 rpnstack_free(&rpnstack);
1298 #ifdef HAVE_MMAP
1299 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1300 rrd_set_error("error writing(unmapping) file: %s", filename);
1301 }
1302 #endif
1303 /* if we got here and if there is an error and if the file has not been
1304 * written to, then close things up and return. */
1305 if (rrd_test_error()) {
1306 free(updvals);
1307 free(tmpl_idx);
1308 rrd_free(&rrd);
1309 free(pdp_temp);
1310 free(pdp_new);
1311 close(rrd_file->fd);
1312 return(-1);
1313 }
1315 /* aargh ... that was tough ... so many loops ... anyway, its done.
1316 * we just need to write back the live header portion now*/
1318 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1319 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1320 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1321 SEEK_SET) != 0) {
1322 rrd_set_error("seek rrd for live header writeback");
1323 free(updvals);
1324 free(tmpl_idx);
1325 rrd_free(&rrd);
1326 free(pdp_temp);
1327 free(pdp_new);
1328 close(rrd_file->fd);
1329 return(-1);
1330 }
1332 if(version >= 3) {
1333 if(rrd_write(rrd_file, rrd.live_head,
1334 sizeof(live_head_t)*1) != sizeof(live_head_t)*1){
1335 rrd_set_error("rrd_write live_head to rrd");
1336 free(updvals);
1337 rrd_free(&rrd);
1338 free(tmpl_idx);
1339 free(pdp_temp);
1340 free(pdp_new);
1341 close(rrd_file->fd);
1342 return(-1);
1343 }
1344 }
1345 else {
1346 if(rrd_write(rrd_file, &rrd.live_head->last_up,
1347 sizeof(time_t)*1) != sizeof(time_t)*1){
1348 rrd_set_error("rrd_write live_head to rrd");
1349 free(updvals);
1350 rrd_free(&rrd);
1351 free(tmpl_idx);
1352 free(pdp_temp);
1353 free(pdp_new);
1354 close(rrd_file->fd);
1355 return(-1);
1356 }
1357 }
1360 if(rrd_write(rrd_file, rrd.pdp_prep,
1361 sizeof(pdp_prep_t)*rrd.stat_head->ds_cnt)
1362 != (ssize_t)(sizeof(pdp_prep_t)*rrd.stat_head->ds_cnt)){
1363 rrd_set_error("rrd_write pdp_prep to rrd");
1364 free(updvals);
1365 rrd_free(&rrd);
1366 free(tmpl_idx);
1367 free(pdp_temp);
1368 free(pdp_new);
1369 close(rrd_file->fd);
1370 return(-1);
1371 }
1373 if(rrd_write(rrd_file, rrd.cdp_prep,
1374 sizeof(cdp_prep_t)*rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt)
1375 != (ssize_t)(sizeof(cdp_prep_t)*rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt)){
1377 rrd_set_error("rrd_write cdp_prep to rrd");
1378 free(updvals);
1379 free(tmpl_idx);
1380 rrd_free(&rrd);
1381 free(pdp_temp);
1382 free(pdp_new);
1383 close(rrd_file->fd);
1384 return(-1);
1385 }
1387 if(rrd_write(rrd_file, rrd.rra_ptr,
1388 sizeof(rra_ptr_t)* rrd.stat_head->rra_cnt)
1389 != (ssize_t)(sizeof(rra_ptr_t)*rrd.stat_head->rra_cnt)){
1390 rrd_set_error("rrd_write rra_ptr to rrd");
1391 free(updvals);
1392 free(tmpl_idx);
1393 rrd_free(&rrd);
1394 free(pdp_temp);
1395 free(pdp_new);
1396 close(rrd_file->fd);
1397 return(-1);
1398 }
1400 #ifdef HAVE_POSIX_FADVISExxx
1402 /* with update we have write ops, so they will probably not be done by now, this means
1403 the buffers will not get freed. But calling this for the whole file - header
1404 will let the data off the hook as soon as it is written when if it is from a previous
1405 update cycle. Calling fdsync to force things is much too hard here. */
1407 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1408 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1409 close(rrd_file->fd);
1410 return(-1);
1411 }
1412 #endif
1413 /*XXX: ? */rrd_flush(rrd_file);
1415 /* calling the smoothing code here guarantees at most
1416 * one smoothing operation per rrd_update call. Unfortunately,
1417 * it is possible with bulk updates, or a long-delayed update
1418 * for smoothing to occur off-schedule. This really isn't
1419 * critical except during the burning cycles. */
1420 if (schedule_smooth)
1421 {
1422 // in_file = fopen(filename,"rb+");
1425 rra_start = rra_begin;
1426 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1427 {
1428 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1429 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1430 {
1431 #ifdef DEBUG
1432 fprintf(stderr,"Running smoother for rra %ld\n",i);
1433 #endif
1434 apply_smoother(&rrd,i,rra_start,rrd_file);
1435 if (rrd_test_error())
1436 break;
1437 }
1438 rra_start += rrd.rra_def[i].row_cnt
1439 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1440 }
1441 #ifdef HAVE_POSIX_FADVISExxx
1442 /* same procedure as above ... */
1443 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1444 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1445 close(rrd_file->fd);
1446 return(-1);
1447 }
1448 #endif
1449 close(rrd_file->fd);
1450 }
1452 /* OK now close the files and free the memory */
1453 if(close(rrd_file->fd) != 0){
1454 rrd_set_error("closing rrd");
1455 free(updvals);
1456 free(tmpl_idx);
1457 rrd_free(&rrd);
1458 free(pdp_temp);
1459 free(pdp_new);
1460 return(-1);
1461 }
1463 rrd_free(&rrd);
1464 free(updvals);
1465 free(tmpl_idx);
1466 free(pdp_new);
1467 free(pdp_temp);
1468 return(0);
1469 }
1471 /*
1472 * get exclusive lock to whole file.
1473 * lock gets removed when we close the file
1474 *
1475 * returns 0 on success
1476 */
1477 int
1478 LockRRD(int in_file)
1479 {
1480 int rcstat;
1482 {
1483 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1484 struct _stat st;
1486 if ( _fstat( in_file, &st ) == 0 ) {
1487 rcstat = _locking ( in_file, _LK_NBLCK, st.st_size );
1488 } else {
1489 rcstat = -1;
1490 }
1491 #else
1492 struct flock lock;
1493 lock.l_type = F_WRLCK; /* exclusive write lock */
1494 lock.l_len = 0; /* whole file */
1495 lock.l_start = 0; /* start of file */
1496 lock.l_whence = SEEK_SET; /* end of file */
1498 rcstat = fcntl(in_file, F_SETLK, &lock);
1499 #endif
1500 }
1502 return(rcstat);
1503 }
1506 #ifdef HAVE_MMAP
1507 info_t
1508 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1509 unsigned short CDP_scratch_idx,
1510 #ifndef DEBUG
1511 int UNUSED(in_file),
1512 #else
1513 int in_file,
1514 #endif
1515 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1516 #else
1517 info_t
1518 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1519 unsigned short CDP_scratch_idx, int in_file,
1520 info_t *pcdp_summary, time_t *rra_time)
1521 #endif
1522 {
1523 unsigned long ds_idx, cdp_idx;
1524 infoval iv;
1526 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1527 {
1528 /* compute the cdp index */
1529 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1530 #ifdef DEBUG
1531 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1532 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,rrd_file->pos,
1533 rrd -> rra_def[rra_idx].cf_nam);
1534 #endif
1535 if (pcdp_summary != NULL)
1536 {
1537 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1538 /* append info to the return hash */
1539 pcdp_summary = info_push(pcdp_summary,
1540 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1541 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1542 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1543 RD_I_VAL, iv);
1544 }
1545 #ifdef HAVE_MMAP
1546 memcpy((char *)rrd_mmaped_file + *rra_current,
1547 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1548 sizeof(rrd_value_t));
1549 #else
1550 if(rrd_write(rrd_file,&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1551 sizeof(rrd_value_t)*1) != sizeof(rrd_value_t)*1)
1552 {
1553 rrd_set_error("writing rrd");
1554 return 0;
1555 }
1556 #endif
1557 *rra_current += sizeof(rrd_value_t);
1558 }
1559 return (pcdp_summary);
1560 }