e8d629ea22568b972f71cb858cb06838b8d5ea14
1 /*****************************************************************************
2 * RRDtool 1.2.11 Copyright by Tobi Oetiker, 1997-2005
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normilize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static void normalize_time(struct timeval *t)
65 {
66 if(t->tv_usec < 0) {
67 t->tv_sec--;
68 t->tv_usec += 1000000L;
69 }
70 }
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *template, int argc, char **argv);
91 int _rrd_update(char *filename, char *template, int argc, char **argv,
92 info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 #ifdef STANDALONE
98 int
99 main(int argc, char **argv){
100 rrd_update(argc,argv);
101 if (rrd_test_error()) {
102 printf("RRDtool " PACKAGE_VERSION " Copyright by Tobi Oetiker, 1997-2005\n\n"
103 "Usage: rrdupdate filename\n"
104 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105 "\t\t\ttime|N:value[:value...]\n\n"
106 "\t\t\tat-time@value[:value...]\n\n"
107 "\t\t\t[ time:value[:value...] ..]\n\n");
109 printf("ERROR: %s\n",rrd_get_error());
110 rrd_clear_error();
111 return 1;
112 }
113 return 0;
114 }
115 #endif
117 info_t *rrd_update_v(int argc, char **argv)
118 {
119 char *template = NULL;
120 info_t *result = NULL;
121 infoval rc;
122 optind = 0; opterr = 0; /* initialize getopt */
124 while (1) {
125 static struct option long_options[] =
126 {
127 {"template", required_argument, 0, 't'},
128 {0,0,0,0}
129 };
130 int option_index = 0;
131 int opt;
132 opt = getopt_long(argc, argv, "t:",
133 long_options, &option_index);
135 if (opt == EOF)
136 break;
138 switch(opt) {
139 case 't':
140 template = optarg;
141 break;
143 case '?':
144 rrd_set_error("unknown option '%s'",argv[optind-1]);
145 rc.u_int = -1;
146 goto end_tag;
147 }
148 }
150 /* need at least 2 arguments: filename, data. */
151 if (argc-optind < 2) {
152 rrd_set_error("Not enough arguments");
153 rc.u_int = -1;
154 goto end_tag;
155 }
156 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157 rc.u_int = _rrd_update(argv[optind], template,
158 argc - optind - 1, argv + optind + 1, result);
159 result->value.u_int = rc.u_int;
160 end_tag:
161 return result;
162 }
164 int
165 rrd_update(int argc, char **argv)
166 {
167 char *template = NULL;
168 int rc;
169 optind = 0; opterr = 0; /* initialize getopt */
171 while (1) {
172 static struct option long_options[] =
173 {
174 {"template", required_argument, 0, 't'},
175 {0,0,0,0}
176 };
177 int option_index = 0;
178 int opt;
179 opt = getopt_long(argc, argv, "t:",
180 long_options, &option_index);
182 if (opt == EOF)
183 break;
185 switch(opt) {
186 case 't':
187 template = optarg;
188 break;
190 case '?':
191 rrd_set_error("unknown option '%s'",argv[optind-1]);
192 return(-1);
193 }
194 }
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
200 return -1;
201 }
203 rc = rrd_update_r(argv[optind], template,
204 argc - optind - 1, argv + optind + 1);
205 return rc;
206 }
208 int
209 rrd_update_r(char *filename, char *template, int argc, char **argv)
210 {
211 return _rrd_update(filename, template, argc, argv, NULL);
212 }
214 int
215 _rrd_update(char *filename, char *template, int argc, char **argv,
216 info_t *pcdp_summary)
217 {
219 int arg_i = 2;
220 short j;
221 unsigned long i,ii,iii=1;
223 unsigned long rra_begin; /* byte pointer to the rra
224 * area in the rrd file. this
225 * pointer never changes value */
226 unsigned long rra_start; /* byte pointer to the rra
227 * area in the rrd file. this
228 * pointer changes as each rrd is
229 * processed. */
230 unsigned long rra_current; /* byte pointer to the current write
231 * spot in the rrd file. */
232 unsigned long rra_pos_tmp; /* temporary byte pointer. */
233 double interval,
234 pre_int,post_int; /* interval between this and
235 * the last run */
236 unsigned long proc_pdp_st; /* which pdp_st was the last
237 * to be processed */
238 unsigned long occu_pdp_st; /* when was the pdp_st
239 * before the last update
240 * time */
241 unsigned long proc_pdp_age; /* how old was the data in
242 * the pdp prep area when it
243 * was last updated */
244 unsigned long occu_pdp_age; /* how long ago was the last
245 * pdp_step time */
246 rrd_value_t *pdp_new; /* prepare the incoming data
247 * to be added the the
248 * existing entry */
249 rrd_value_t *pdp_temp; /* prepare the pdp values
250 * to be added the the
251 * cdp values */
253 long *tmpl_idx; /* index representing the settings
254 transported by the template index */
255 unsigned long tmpl_cnt = 2; /* time and data */
257 FILE *rrd_file;
258 rrd_t rrd;
259 time_t current_time = 0;
260 time_t rra_time = 0; /* time of update for a RRA */
261 unsigned long current_time_usec=0;/* microseconds part of current time */
262 struct timeval tmp_time; /* used for time conversion */
264 char **updvals;
265 int schedule_smooth = 0;
266 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267 /* a vector of future Holt-Winters seasonal coefs */
268 unsigned long elapsed_pdp_st;
269 /* number of elapsed PDP steps since last update */
270 unsigned long *rra_step_cnt = NULL;
271 /* number of rows to be updated in an RRA for a data
272 * value. */
273 unsigned long start_pdp_offset;
274 /* number of PDP steps since the last update that
275 * are assigned to the first CDP to be generated
276 * since the last update. */
277 unsigned short scratch_idx;
278 /* index into the CDP scratch array */
279 enum cf_en current_cf;
280 /* numeric id of the current consolidation function */
281 rpnstack_t rpnstack; /* used for COMPUTE DS */
282 int version; /* rrd version */
283 char *endptr; /* used in the conversion */
284 #ifdef HAVE_MMAP
285 void *rrd_mmaped_file;
286 unsigned long rrd_filesize;
287 #endif
289 rpnstack_init(&rpnstack);
291 /* need at least 1 arguments: data. */
292 if (argc < 1) {
293 rrd_set_error("Not enough arguments");
294 return -1;
295 }
299 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
300 return -1;
301 }
302 /* initialize time */
303 version = atoi(rrd.stat_head->version);
304 gettimeofday(&tmp_time, 0);
305 normalize_time(&tmp_time);
306 current_time = tmp_time.tv_sec;
307 if(version >= 3) {
308 current_time_usec = tmp_time.tv_usec;
309 }
310 else {
311 current_time_usec = 0;
312 }
314 rra_current = rra_start = rra_begin = ftell(rrd_file);
315 /* This is defined in the ANSI C standard, section 7.9.5.3:
317 When a file is opened with udpate mode ('+' as the second
318 or third character in the ... list of mode argument
319 variables), both input and ouptut may be performed on the
320 associated stream. However, ... input may not be directly
321 followed by output without an intervening call to a file
322 positioning function, unless the input oepration encounters
323 end-of-file. */
324 #ifdef HAVE_MMAP
325 fseek(rrd_file, 0, SEEK_END);
326 rrd_filesize = ftell(rrd_file);
327 fseek(rrd_file, rra_current, SEEK_SET);
328 #else
329 fseek(rrd_file, 0, SEEK_CUR);
330 #endif
333 /* get exclusive lock to whole file.
334 * lock gets removed when we close the file.
335 */
336 if (LockRRD(rrd_file) != 0) {
337 rrd_set_error("could not lock RRD");
338 rrd_free(&rrd);
339 fclose(rrd_file);
340 return(-1);
341 }
343 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating updvals pointer array");
345 rrd_free(&rrd);
346 fclose(rrd_file);
347 return(-1);
348 }
350 if ((pdp_temp = malloc(sizeof(rrd_value_t)
351 *rrd.stat_head->ds_cnt))==NULL){
352 rrd_set_error("allocating pdp_temp ...");
353 free(updvals);
354 rrd_free(&rrd);
355 fclose(rrd_file);
356 return(-1);
357 }
359 if ((tmpl_idx = malloc(sizeof(unsigned long)
360 *(rrd.stat_head->ds_cnt+1)))==NULL){
361 rrd_set_error("allocating tmpl_idx ...");
362 free(pdp_temp);
363 free(updvals);
364 rrd_free(&rrd);
365 fclose(rrd_file);
366 return(-1);
367 }
368 /* initialize template redirector */
369 /* default config example (assume DS 1 is a CDEF DS)
370 tmpl_idx[0] -> 0; (time)
371 tmpl_idx[1] -> 1; (DS 0)
372 tmpl_idx[2] -> 3; (DS 2)
373 tmpl_idx[3] -> 4; (DS 3) */
374 tmpl_idx[0] = 0; /* time */
375 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
376 {
377 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
378 tmpl_idx[ii++]=i;
379 }
380 tmpl_cnt= ii;
382 if (template) {
383 char *dsname;
384 unsigned int tmpl_len;
385 dsname = template;
386 tmpl_cnt = 1; /* the first entry is the time */
387 tmpl_len = strlen(template);
388 for(i=0;i<=tmpl_len ;i++) {
389 if (template[i] == ':' || template[i] == '\0') {
390 template[i] = '\0';
391 if (tmpl_cnt>rrd.stat_head->ds_cnt){
392 rrd_set_error("Template contains more DS definitions than RRD");
393 free(updvals); free(pdp_temp);
394 free(tmpl_idx); rrd_free(&rrd);
395 fclose(rrd_file); return(-1);
396 }
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
398 rrd_set_error("unknown DS name '%s'",dsname);
399 free(updvals); free(pdp_temp);
400 free(tmpl_idx); rrd_free(&rrd);
401 fclose(rrd_file); return(-1);
402 } else {
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt-1]++;
405 /* go to the next entry on the template */
406 dsname = &template[i+1];
407 /* fix the damage we did before */
408 if (i<tmpl_len) {
409 template[i]=':';
410 }
412 }
413 }
414 }
415 }
416 if ((pdp_new = malloc(sizeof(rrd_value_t)
417 *rrd.stat_head->ds_cnt))==NULL){
418 rrd_set_error("allocating pdp_new ...");
419 free(updvals);
420 free(pdp_temp);
421 free(tmpl_idx);
422 rrd_free(&rrd);
423 fclose(rrd_file);
424 return(-1);
425 }
427 #ifdef HAVE_MMAP
428 rrd_mmaped_file = mmap(0,
429 rrd_filesize,
430 PROT_READ | PROT_WRITE,
431 MAP_SHARED,
432 fileno(rrd_file),
433 0);
434 if (rrd_mmaped_file == MAP_FAILED) {
435 rrd_set_error("error mmapping file %s", filename);
436 free(updvals);
437 free(pdp_temp);
438 free(tmpl_idx);
439 rrd_free(&rrd);
440 fclose(rrd_file);
441 return(-1);
442 }
443 #endif
444 /* loop through the arguments. */
445 for(arg_i=0; arg_i<argc;arg_i++) {
446 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
447 char *step_start = stepper;
448 char *p;
449 char *parsetime_error = NULL;
450 enum {atstyle, normal} timesyntax;
451 struct rrd_time_value ds_tv;
452 if (stepper == NULL){
453 rrd_set_error("failed duplication argv entry");
454 free(updvals);
455 free(pdp_temp);
456 free(tmpl_idx);
457 rrd_free(&rrd);
458 #ifdef HAVE_MMAP
459 munmap(rrd_mmaped_file, rrd_filesize);
460 #endif
461 fclose(rrd_file);
462 return(-1);
463 }
464 /* initialize all ds input to unknown except the first one
465 which has always got to be set */
466 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467 strcpy(stepper,argv[arg_i]);
468 updvals[0]=stepper;
469 /* separate all ds elements; first must be examined separately
470 due to alternate time syntax */
471 if ((p=strchr(stepper,'@'))!=NULL) {
472 timesyntax = atstyle;
473 *p = '\0';
474 stepper = p+1;
475 } else if ((p=strchr(stepper,':'))!=NULL) {
476 timesyntax = normal;
477 *p = '\0';
478 stepper = p+1;
479 } else {
480 rrd_set_error("expected timestamp not found in data source from %s:...",
481 argv[arg_i]);
482 free(step_start);
483 break;
484 }
485 ii=1;
486 updvals[tmpl_idx[ii]] = stepper;
487 while (*stepper) {
488 if (*stepper == ':') {
489 *stepper = '\0';
490 ii++;
491 if (ii<tmpl_cnt){
492 updvals[tmpl_idx[ii]] = stepper+1;
493 }
494 }
495 stepper++;
496 }
498 if (ii != tmpl_cnt-1) {
499 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500 tmpl_cnt-1, ii, argv[arg_i]);
501 free(step_start);
502 break;
503 }
505 /* get the time from the reading ... handle N */
506 if (timesyntax == atstyle) {
507 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
509 free(step_start);
510 break;
511 }
512 if (ds_tv.type == RELATIVE_TO_END_TIME ||
513 ds_tv.type == RELATIVE_TO_START_TIME) {
514 rrd_set_error("specifying time relative to the 'start' "
515 "or 'end' makes no sense here: %s",
516 updvals[0]);
517 free(step_start);
518 break;
519 }
521 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
524 } else if (strcmp(updvals[0],"N")==0){
525 gettimeofday(&tmp_time, 0);
526 normalize_time(&tmp_time);
527 current_time = tmp_time.tv_sec;
528 current_time_usec = tmp_time.tv_usec;
529 } else {
530 double tmp;
531 tmp = strtod(updvals[0], 0);
532 current_time = floor(tmp);
533 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
534 }
535 /* dont do any correction for old version RRDs */
536 if(version < 3)
537 current_time_usec = 0;
539 if(current_time < rrd.live_head->last_up ||
540 (current_time == rrd.live_head->last_up &&
541 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
542 rrd_set_error("illegal attempt to update using time %ld when "
543 "last update time is %ld (minimum one second step)",
544 current_time, rrd.live_head->last_up);
545 free(step_start);
546 break;
547 }
550 /* seek to the beginning of the rra's */
551 if (rra_current != rra_begin) {
552 #ifndef HAVE_MMAP
553 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
554 rrd_set_error("seek error in rrd");
555 free(step_start);
556 break;
557 }
558 #endif
559 rra_current = rra_begin;
560 }
561 rra_start = rra_begin;
563 /* when was the current pdp started */
564 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
567 /* when did the last pdp_st occur */
568 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569 occu_pdp_st = current_time - occu_pdp_age;
571 /* interval = current_time - rrd.live_head->last_up; */
572 interval = (double)(current_time - rrd.live_head->last_up)
573 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
575 if (occu_pdp_st > proc_pdp_st){
576 /* OK we passed the pdp_st moment*/
577 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
578 * occurred before the latest
579 * pdp_st moment*/
580 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
581 post_int = occu_pdp_age; /* how much after it */
582 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
583 } else {
584 pre_int = interval;
585 post_int = 0;
586 }
588 #ifdef DEBUG
589 printf(
590 "proc_pdp_age %lu\t"
591 "proc_pdp_st %lu\t"
592 "occu_pfp_age %lu\t"
593 "occu_pdp_st %lu\t"
594 "int %lf\t"
595 "pre_int %lf\t"
596 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
597 occu_pdp_age, occu_pdp_st,
598 interval, pre_int, post_int);
599 #endif
601 /* process the data sources and update the pdp_prep
602 * area accordingly */
603 for(i=0;i<rrd.stat_head->ds_cnt;i++){
604 enum dst_en dst_idx;
605 dst_idx= dst_conv(rrd.ds_def[i].dst);
607 /* make sure we do not build diffs with old last_ds values */
608 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval
609 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
610 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
611 }
613 /* NOTE: DST_CDEF should never enter this if block, because
614 * updvals[i+1][0] is initialized to 'U'; unless the caller
615 * accidently specified a value for the DST_CDEF. To handle
616 * this case, an extra check is required. */
618 if((updvals[i+1][0] != 'U') &&
619 (dst_idx != DST_CDEF) &&
620 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
621 double rate = DNAN;
622 /* the data source type defines how to process the data */
623 /* pdp_new contains rate * time ... eg the bytes
624 * transferred during the interval. Doing it this way saves
625 * a lot of math operations */
628 switch(dst_idx){
629 case DST_COUNTER:
630 case DST_DERIVE:
631 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
632 for(ii=0;updvals[i+1][ii] != '\0';ii++){
633 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
634 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
635 break;
636 }
637 }
638 if (rrd_test_error()){
639 break;
640 }
641 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
642 if(dst_idx == DST_COUNTER) {
643 /* simple overflow catcher suggested by Andres Kroonmaa */
644 /* this will fail terribly for non 32 or 64 bit counters ... */
645 /* are there any others in SNMP land ? */
646 if (pdp_new[i] < (double)0.0 )
647 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
648 if (pdp_new[i] < (double)0.0 )
649 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
650 }
651 rate = pdp_new[i] / interval;
652 }
653 else {
654 pdp_new[i]= DNAN;
655 }
656 break;
657 case DST_ABSOLUTE:
658 errno = 0;
659 pdp_new[i] = strtod(updvals[i+1],&endptr);
660 if (errno > 0){
661 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
662 break;
663 };
664 if (endptr[0] != '\0'){
665 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
666 break;
667 }
668 rate = pdp_new[i] / interval;
669 break;
670 case DST_GAUGE:
671 errno = 0;
672 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
673 if (errno > 0){
674 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
675 break;
676 };
677 if (endptr[0] != '\0'){
678 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
679 break;
680 }
681 rate = pdp_new[i] / interval;
682 break;
683 default:
684 rrd_set_error("rrd contains unknown DS type : '%s'",
685 rrd.ds_def[i].dst);
686 break;
687 }
688 /* break out of this for loop if the error string is set */
689 if (rrd_test_error()){
690 break;
691 }
692 /* make sure pdp_temp is neither too large or too small
693 * if any of these occur it becomes unknown ...
694 * sorry folks ... */
695 if ( ! isnan(rate) &&
696 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
697 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
698 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
699 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
700 pdp_new[i] = DNAN;
701 }
702 } else {
703 /* no news is news all the same */
704 pdp_new[i] = DNAN;
705 }
707 /* make a copy of the command line argument for the next run */
708 #ifdef DEBUG
709 fprintf(stderr,
710 "prep ds[%lu]\t"
711 "last_arg '%s'\t"
712 "this_arg '%s'\t"
713 "pdp_new %10.2f\n",
714 i,
715 rrd.pdp_prep[i].last_ds,
716 updvals[i+1], pdp_new[i]);
717 #endif
718 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
719 strncpy(rrd.pdp_prep[i].last_ds,
720 updvals[i+1],LAST_DS_LEN-1);
721 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
722 }
723 }
724 /* break out of the argument parsing loop if the error_string is set */
725 if (rrd_test_error()){
726 free(step_start);
727 break;
728 }
729 /* has a pdp_st moment occurred since the last run ? */
731 if (proc_pdp_st == occu_pdp_st){
732 /* no we have not passed a pdp_st moment. therefore update is simple */
734 for(i=0;i<rrd.stat_head->ds_cnt;i++){
735 if(isnan(pdp_new[i]))
736 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval-0.5);
737 else
738 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
739 #ifdef DEBUG
740 fprintf(stderr,
741 "NO PDP ds[%lu]\t"
742 "value %10.2f\t"
743 "unkn_sec %5lu\n",
744 i,
745 rrd.pdp_prep[i].scratch[PDP_val].u_val,
746 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 #endif
748 }
749 } else {
750 /* an pdp_st has occurred. */
752 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
753 * occurred up to the last run.
754 pdp_new[] contains rate*seconds from the latest run.
755 pdp_temp[] will contain the rate for cdp */
757 for(i=0;i<rrd.stat_head->ds_cnt;i++){
758 /* update pdp_prep to the current pdp_st */
759 if(isnan(pdp_new[i]))
760 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
761 else
762 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
763 pdp_new[i]/interval*pre_int;
765 /* if too much of the pdp_prep is unknown we dump it */
766 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
767 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
768 (occu_pdp_st-proc_pdp_st <=
769 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
770 pdp_temp[i] = DNAN;
771 } else {
772 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
773 / (double)( occu_pdp_st
774 - proc_pdp_st
775 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
776 }
778 /* process CDEF data sources; remember each CDEF DS can
779 * only reference other DS with a lower index number */
780 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
781 rpnp_t *rpnp;
782 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
783 /* substitue data values for OP_VARIABLE nodes */
784 for (ii = 0; rpnp[ii].op != OP_END; ii++)
785 {
786 if (rpnp[ii].op == OP_VARIABLE) {
787 rpnp[ii].op = OP_NUMBER;
788 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
789 }
790 }
791 /* run the rpn calculator */
792 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
793 free(rpnp);
794 break; /* exits the data sources pdp_temp loop */
795 }
796 }
798 /* make pdp_prep ready for the next run */
799 if(isnan(pdp_new[i])){
800 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
801 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
802 } else {
803 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
804 rrd.pdp_prep[i].scratch[PDP_val].u_val =
805 pdp_new[i]/interval*post_int;
806 }
808 #ifdef DEBUG
809 fprintf(stderr,
810 "PDP UPD ds[%lu]\t"
811 "pdp_temp %10.2f\t"
812 "new_prep %10.2f\t"
813 "new_unkn_sec %5lu\n",
814 i, pdp_temp[i],
815 rrd.pdp_prep[i].scratch[PDP_val].u_val,
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
817 #endif
818 }
820 /* if there were errors during the last loop, bail out here */
821 if (rrd_test_error()){
822 free(step_start);
823 break;
824 }
826 /* compute the number of elapsed pdp_st moments */
827 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
828 #ifdef DEBUG
829 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
830 #endif
831 if (rra_step_cnt == NULL)
832 {
833 rra_step_cnt = (unsigned long *)
834 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
835 }
837 for(i = 0, rra_start = rra_begin;
838 i < rrd.stat_head->rra_cnt;
839 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
840 i++)
841 {
842 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
843 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
844 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
845 if (start_pdp_offset <= elapsed_pdp_st) {
846 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
847 rrd.rra_def[i].pdp_cnt + 1;
848 } else {
849 rra_step_cnt[i] = 0;
850 }
852 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
853 {
854 /* If this is a bulk update, we need to skip ahead in the seasonal
855 * arrays so that they will be correct for the next observed value;
856 * note that for the bulk update itself, no update will occur to
857 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
858 * be set to DNAN. */
859 if (rra_step_cnt[i] > 2)
860 {
861 /* skip update by resetting rra_step_cnt[i],
862 * note that this is not data source specific; this is due
863 * to the bulk update, not a DNAN value for the specific data
864 * source. */
865 rra_step_cnt[i] = 0;
866 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
867 &last_seasonal_coef);
868 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
869 &seasonal_coef);
870 }
872 /* periodically run a smoother for seasonal effects */
873 /* Need to use first cdp parameter buffer to track
874 * burnin (burnin requires a specific smoothing schedule).
875 * The CDP_init_seasonal parameter is really an RRA level,
876 * not a data source within RRA level parameter, but the rra_def
877 * is read only for rrd_update (not flushed to disk). */
878 iii = i*(rrd.stat_head -> ds_cnt);
879 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
880 <= BURNIN_CYCLES)
881 {
882 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
883 > rrd.rra_def[i].row_cnt - 1) {
884 /* mark off one of the burnin cycles */
885 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
886 schedule_smooth = 1;
887 }
888 } else {
889 /* someone has no doubt invented a trick to deal with this
890 * wrap around, but at least this code is clear. */
891 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
892 rrd.rra_ptr[i].cur_row)
893 {
894 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
895 * mapping between PDP and CDP */
896 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
897 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
898 {
899 #ifdef DEBUG
900 fprintf(stderr,
901 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
902 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
903 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
904 #endif
905 schedule_smooth = 1;
906 }
907 } else {
908 /* can't rely on negative numbers because we are working with
909 * unsigned values */
910 /* Don't need modulus here. If we've wrapped more than once, only
911 * one smooth is executed at the end. */
912 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
913 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
914 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
915 {
916 #ifdef DEBUG
917 fprintf(stderr,
918 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
920 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
921 #endif
922 schedule_smooth = 1;
923 }
924 }
925 }
927 rra_current = ftell(rrd_file);
928 } /* if cf is DEVSEASONAL or SEASONAL */
930 if (rrd_test_error()) break;
932 /* update CDP_PREP areas */
933 /* loop over data soures within each RRA */
934 for(ii = 0;
935 ii < rrd.stat_head->ds_cnt;
936 ii++)
937 {
939 /* iii indexes the CDP prep area for this data source within the RRA */
940 iii=i*rrd.stat_head->ds_cnt+ii;
942 if (rrd.rra_def[i].pdp_cnt > 1) {
944 if (rra_step_cnt[i] > 0) {
945 /* If we are in this block, as least 1 CDP value will be written to
946 * disk, this is the CDP_primary_val entry. If more than 1 value needs
947 * to be written, then the "fill in" value is the CDP_secondary_val
948 * entry. */
949 if (isnan(pdp_temp[ii]))
950 {
951 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
952 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
953 } else {
954 /* CDP_secondary value is the RRA "fill in" value for intermediary
955 * CDP data entries. No matter the CF, the value is the same because
956 * the average, max, min, and last of a list of identical values is
957 * the same, namely, the value itself. */
958 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
959 }
961 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
962 > rrd.rra_def[i].pdp_cnt*
963 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
964 {
965 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
966 /* initialize carry over */
967 if (current_cf == CF_AVERAGE) {
968 if (isnan(pdp_temp[ii])) {
969 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
970 } else {
971 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
972 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
973 }
974 } else {
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
976 }
977 } else {
978 rrd_value_t cum_val, cur_val;
979 switch (current_cf) {
980 case CF_AVERAGE:
981 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
982 cur_val = IFDNAN(pdp_temp[ii],0.0);
983 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
984 (cum_val + cur_val * start_pdp_offset) /
985 (rrd.rra_def[i].pdp_cnt
986 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
987 /* initialize carry over value */
988 if (isnan(pdp_temp[ii])) {
989 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
990 } else {
991 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
992 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
993 }
994 break;
995 case CF_MAXIMUM:
996 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
997 cur_val = IFDNAN(pdp_temp[ii],-DINF);
998 #ifdef DEBUG
999 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1000 isnan(pdp_temp[ii])) {
1001 fprintf(stderr,
1002 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1003 i,ii);
1004 exit(-1);
1005 }
1006 #endif
1007 if (cur_val > cum_val)
1008 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1009 else
1010 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1011 /* initialize carry over value */
1012 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1013 break;
1014 case CF_MINIMUM:
1015 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1016 cur_val = IFDNAN(pdp_temp[ii],DINF);
1017 #ifdef DEBUG
1018 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1019 isnan(pdp_temp[ii])) {
1020 fprintf(stderr,
1021 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1022 i,ii);
1023 exit(-1);
1024 }
1025 #endif
1026 if (cur_val < cum_val)
1027 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1028 else
1029 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1030 /* initialize carry over value */
1031 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1032 break;
1033 case CF_LAST:
1034 default:
1035 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1036 /* initialize carry over value */
1037 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1038 break;
1039 }
1040 } /* endif meets xff value requirement for a valid value */
1041 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1042 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1043 if (isnan(pdp_temp[ii]))
1044 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1045 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1046 else
1047 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1048 } else /* rra_step_cnt[i] == 0 */
1049 {
1050 #ifdef DEBUG
1051 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1052 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1053 i,ii);
1054 } else {
1055 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1056 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1057 }
1058 #endif
1059 if (isnan(pdp_temp[ii])) {
1060 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1061 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1062 {
1063 if (current_cf == CF_AVERAGE) {
1064 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1065 elapsed_pdp_st;
1066 } else {
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068 }
1069 #ifdef DEBUG
1070 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1071 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1072 #endif
1073 } else {
1074 switch (current_cf) {
1075 case CF_AVERAGE:
1076 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1077 elapsed_pdp_st;
1078 break;
1079 case CF_MINIMUM:
1080 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1081 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1082 break;
1083 case CF_MAXIMUM:
1084 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1085 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086 break;
1087 case CF_LAST:
1088 default:
1089 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1090 break;
1091 }
1092 }
1093 }
1094 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1095 if (elapsed_pdp_st > 2)
1096 {
1097 switch (current_cf) {
1098 case CF_AVERAGE:
1099 default:
1100 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1101 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1102 break;
1103 case CF_SEASONAL:
1104 case CF_DEVSEASONAL:
1105 /* need to update cached seasonal values, so they are consistent
1106 * with the bulk update */
1107 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1108 * CDP_last_deviation are the same. */
1109 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1110 last_seasonal_coef[ii];
1111 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1112 seasonal_coef[ii];
1113 break;
1114 case CF_HWPREDICT:
1115 /* need to update the null_count and last_null_count.
1116 * even do this for non-DNAN pdp_temp because the
1117 * algorithm is not learning from batch updates. */
1118 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1119 elapsed_pdp_st;
1120 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1121 elapsed_pdp_st - 1;
1122 /* fall through */
1123 case CF_DEVPREDICT:
1124 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1125 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1126 break;
1127 case CF_FAILURES:
1128 /* do not count missed bulk values as failures */
1129 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1130 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1131 /* need to reset violations buffer.
1132 * could do this more carefully, but for now, just
1133 * assume a bulk update wipes away all violations. */
1134 erase_violations(&rrd, iii, i);
1135 break;
1136 }
1137 }
1138 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1140 if (rrd_test_error()) break;
1142 } /* endif data sources loop */
1143 } /* end RRA Loop */
1145 /* this loop is only entered if elapsed_pdp_st < 3 */
1146 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1147 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1148 {
1149 for(i = 0, rra_start = rra_begin;
1150 i < rrd.stat_head->rra_cnt;
1151 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1152 i++)
1153 {
1154 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1156 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1157 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1158 {
1159 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1160 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1161 &seasonal_coef);
1162 rra_current = ftell(rrd_file);
1163 }
1164 if (rrd_test_error()) break;
1165 /* loop over data soures within each RRA */
1166 for(ii = 0;
1167 ii < rrd.stat_head->ds_cnt;
1168 ii++)
1169 {
1170 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1171 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1172 scratch_idx, seasonal_coef);
1173 }
1174 } /* end RRA Loop */
1175 if (rrd_test_error()) break;
1176 } /* end elapsed_pdp_st loop */
1178 if (rrd_test_error()) break;
1180 /* Ready to write to disk */
1181 /* Move sequentially through the file, writing one RRA at a time.
1182 * Note this architecture divorces the computation of CDP with
1183 * flushing updated RRA entries to disk. */
1184 for(i = 0, rra_start = rra_begin;
1185 i < rrd.stat_head->rra_cnt;
1186 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1187 i++) {
1188 /* is there anything to write for this RRA? If not, continue. */
1189 if (rra_step_cnt[i] == 0) continue;
1191 /* write the first row */
1192 #ifdef DEBUG
1193 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1194 #endif
1195 rrd.rra_ptr[i].cur_row++;
1196 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1197 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1198 /* positition on the first row */
1199 rra_pos_tmp = rra_start +
1200 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1201 if(rra_pos_tmp != rra_current) {
1202 #ifndef HAVE_MMAP
1203 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1204 rrd_set_error("seek error in rrd");
1205 break;
1206 }
1207 #endif
1208 rra_current = rra_pos_tmp;
1209 }
1211 #ifdef DEBUG
1212 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1213 #endif
1214 scratch_idx = CDP_primary_val;
1215 if (pcdp_summary != NULL)
1216 {
1217 rra_time = (current_time - current_time
1218 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1219 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1220 }
1221 #ifdef HAVE_MMAP
1222 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1223 pcdp_summary, &rra_time, rrd_mmaped_file);
1224 #else
1225 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1226 pcdp_summary, &rra_time);
1227 #endif
1228 if (rrd_test_error()) break;
1230 /* write other rows of the bulk update, if any */
1231 scratch_idx = CDP_secondary_val;
1232 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1233 {
1234 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1235 {
1236 #ifdef DEBUG
1237 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1238 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1239 #endif
1240 /* wrap */
1241 rrd.rra_ptr[i].cur_row = 0;
1242 /* seek back to beginning of current rra */
1243 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1244 {
1245 rrd_set_error("seek error in rrd");
1246 break;
1247 }
1248 #ifdef DEBUG
1249 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1250 #endif
1251 rra_current = rra_start;
1252 }
1253 if (pcdp_summary != NULL)
1254 {
1255 rra_time = (current_time - current_time
1256 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1257 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1258 }
1259 #ifdef HAVE_MMAP
1260 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1261 pcdp_summary, &rra_time, rrd_mmaped_file);
1262 #else
1263 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1264 pcdp_summary, &rra_time);
1265 #endif
1266 }
1268 if (rrd_test_error())
1269 break;
1270 } /* RRA LOOP */
1272 /* break out of the argument parsing loop if error_string is set */
1273 if (rrd_test_error()){
1274 free(step_start);
1275 break;
1276 }
1278 } /* endif a pdp_st has occurred */
1279 rrd.live_head->last_up = current_time;
1280 rrd.live_head->last_up_usec = current_time_usec;
1281 free(step_start);
1282 } /* function argument loop */
1284 if (seasonal_coef != NULL) free(seasonal_coef);
1285 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1286 if (rra_step_cnt != NULL) free(rra_step_cnt);
1287 rpnstack_free(&rpnstack);
1289 #ifdef HAVE_MMAP
1290 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1291 rrd_set_error("error writing(unmapping) file: %s", filename);
1292 }
1293 #endif
1294 /* if we got here and if there is an error and if the file has not been
1295 * written to, then close things up and return. */
1296 if (rrd_test_error()) {
1297 free(updvals);
1298 free(tmpl_idx);
1299 rrd_free(&rrd);
1300 free(pdp_temp);
1301 free(pdp_new);
1302 fclose(rrd_file);
1303 return(-1);
1304 }
1306 /* aargh ... that was tough ... so many loops ... anyway, its done.
1307 * we just need to write back the live header portion now*/
1309 if (fseek(rrd_file, (sizeof(stat_head_t)
1310 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1311 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1312 SEEK_SET) != 0) {
1313 rrd_set_error("seek rrd for live header writeback");
1314 free(updvals);
1315 free(tmpl_idx);
1316 rrd_free(&rrd);
1317 free(pdp_temp);
1318 free(pdp_new);
1319 fclose(rrd_file);
1320 return(-1);
1321 }
1323 if(version >= 3) {
1324 if(fwrite( rrd.live_head,
1325 sizeof(live_head_t), 1, rrd_file) != 1){
1326 rrd_set_error("fwrite live_head to rrd");
1327 free(updvals);
1328 rrd_free(&rrd);
1329 free(tmpl_idx);
1330 free(pdp_temp);
1331 free(pdp_new);
1332 fclose(rrd_file);
1333 return(-1);
1334 }
1335 }
1336 else {
1337 if(fwrite( &rrd.live_head->last_up,
1338 sizeof(time_t), 1, rrd_file) != 1){
1339 rrd_set_error("fwrite live_head to rrd");
1340 free(updvals);
1341 rrd_free(&rrd);
1342 free(tmpl_idx);
1343 free(pdp_temp);
1344 free(pdp_new);
1345 fclose(rrd_file);
1346 return(-1);
1347 }
1348 }
1351 if(fwrite( rrd.pdp_prep,
1352 sizeof(pdp_prep_t),
1353 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1354 rrd_set_error("ftwrite pdp_prep to rrd");
1355 free(updvals);
1356 rrd_free(&rrd);
1357 free(tmpl_idx);
1358 free(pdp_temp);
1359 free(pdp_new);
1360 fclose(rrd_file);
1361 return(-1);
1362 }
1364 if(fwrite( rrd.cdp_prep,
1365 sizeof(cdp_prep_t),
1366 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1367 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1369 rrd_set_error("ftwrite cdp_prep to rrd");
1370 free(updvals);
1371 free(tmpl_idx);
1372 rrd_free(&rrd);
1373 free(pdp_temp);
1374 free(pdp_new);
1375 fclose(rrd_file);
1376 return(-1);
1377 }
1379 if(fwrite( rrd.rra_ptr,
1380 sizeof(rra_ptr_t),
1381 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1382 rrd_set_error("fwrite rra_ptr to rrd");
1383 free(updvals);
1384 free(tmpl_idx);
1385 rrd_free(&rrd);
1386 free(pdp_temp);
1387 free(pdp_new);
1388 fclose(rrd_file);
1389 return(-1);
1390 }
1392 /* OK now close the files and free the memory */
1393 if(fclose(rrd_file) != 0){
1394 rrd_set_error("closing rrd");
1395 free(updvals);
1396 free(tmpl_idx);
1397 rrd_free(&rrd);
1398 free(pdp_temp);
1399 free(pdp_new);
1400 return(-1);
1401 }
1403 /* calling the smoothing code here guarantees at most
1404 * one smoothing operation per rrd_update call. Unfortunately,
1405 * it is possible with bulk updates, or a long-delayed update
1406 * for smoothing to occur off-schedule. This really isn't
1407 * critical except during the burning cycles. */
1408 if (schedule_smooth)
1409 {
1410 rrd_file = fopen(filename,"rb+");
1411 rra_start = rra_begin;
1412 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1413 {
1414 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1415 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1416 {
1417 #ifdef DEBUG
1418 fprintf(stderr,"Running smoother for rra %ld\n",i);
1419 #endif
1420 apply_smoother(&rrd,i,rra_start,rrd_file);
1421 if (rrd_test_error())
1422 break;
1423 }
1424 rra_start += rrd.rra_def[i].row_cnt
1425 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1426 }
1427 fclose(rrd_file);
1428 }
1429 rrd_free(&rrd);
1430 free(updvals);
1431 free(tmpl_idx);
1432 free(pdp_new);
1433 free(pdp_temp);
1434 return(0);
1435 }
1437 /*
1438 * get exclusive lock to whole file.
1439 * lock gets removed when we close the file
1440 *
1441 * returns 0 on success
1442 */
1443 int
1444 LockRRD(FILE *rrdfile)
1445 {
1446 int rrd_fd; /* File descriptor for RRD */
1447 int rcstat;
1449 rrd_fd = fileno(rrdfile);
1451 {
1452 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1453 struct _stat st;
1455 if ( _fstat( rrd_fd, &st ) == 0 ) {
1456 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1457 } else {
1458 rcstat = -1;
1459 }
1460 #else
1461 struct flock lock;
1462 lock.l_type = F_WRLCK; /* exclusive write lock */
1463 lock.l_len = 0; /* whole file */
1464 lock.l_start = 0; /* start of file */
1465 lock.l_whence = SEEK_SET; /* end of file */
1467 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1468 #endif
1469 }
1471 return(rcstat);
1472 }
1475 #ifdef HAVE_MMAP
1476 info_t
1477 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1478 unsigned short CDP_scratch_idx,
1479 #ifndef DEBUG
1480 FILE UNUSED(*rrd_file),
1481 #else
1482 FILE *rrd_file,
1483 #endif
1484 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1485 #else
1486 info_t
1487 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1488 unsigned short CDP_scratch_idx, FILE *rrd_file,
1489 info_t *pcdp_summary, time_t *rra_time)
1490 #endif
1491 {
1492 unsigned long ds_idx, cdp_idx;
1493 infoval iv;
1495 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1496 {
1497 /* compute the cdp index */
1498 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1499 #ifdef DEBUG
1500 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1501 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1502 rrd -> rra_def[rra_idx].cf_nam);
1503 #endif
1504 if (pcdp_summary != NULL)
1505 {
1506 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1507 /* append info to the return hash */
1508 pcdp_summary = info_push(pcdp_summary,
1509 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1510 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1511 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1512 RD_I_VAL, iv);
1513 }
1514 #ifdef HAVE_MMAP
1515 memcpy((char *)rrd_mmaped_file + *rra_current,
1516 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1517 sizeof(rrd_value_t));
1518 #else
1519 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1520 sizeof(rrd_value_t),1,rrd_file) != 1)
1521 {
1522 rrd_set_error("writing rrd");
1523 return 0;
1524 }
1525 #endif
1526 *rra_current += sizeof(rrd_value_t);
1527 }
1528 return (pcdp_summary);
1529 }