9da2cace41a418328e040128ac199c57f75fb7a5
1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2004
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.17 2004/05/26 22:11:12 oetiker
9 * reduce compiler warnings. Many small fixes. -- Mike Slifcak <slif@bellsouth.net>
10 *
11 * Revision 1.16 2004/05/25 20:52:16 oetiker
12 * fix spelling and syntax, especially in messages that are printed -- Mike Slifcak
13 *
14 * Revision 1.15 2004/05/25 20:51:49 oetiker
15 * Update displayed copyright messages to be consistent. -- Mike Slifcak
16 *
17 * Revision 1.14 2003/11/11 19:46:21 oetiker
18 * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
19 *
20 * Revision 1.13 2003/11/11 19:38:03 oetiker
21 * rrd files should NOT change size ever ... bulk update code wa buggy.
22 * -- David M. Grimes <dgrimes@navisite.com>
23 *
24 * Revision 1.12 2003/09/04 13:16:12 oetiker
25 * should not assigne but compare ... grrrrr
26 *
27 * Revision 1.11 2003/09/02 21:58:35 oetiker
28 * be pickier about what we accept in rrd_update. Complain if things do not work out
29 *
30 * Revision 1.10 2003/04/29 19:14:12 jake
31 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
32 * Also revert accidental addition of -I to aclocal MakeMakefile.
33 *
34 * Revision 1.9 2003/04/25 18:35:08 jake
35 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
36 *
37 * Revision 1.8 2003/03/31 21:22:12 oetiker
38 * enables RRDtool updates with microsecond or in case of windows millisecond
39 * precision. This is needed to reduce time measurement error when archive step
40 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
41 *
42 * Revision 1.7 2003/02/13 07:05:27 oetiker
43 * Find attached the patch I promised to send to you. Please note that there
44 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
45 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
46 * library is identical to librrd, but it contains support code for per-thread
47 * global variables currently used for error information only. This is similar
48 * to how errno per-thread variables are implemented. librrd_th must be linked
49 * alongside of libpthred
50 *
51 * There is also a new file "THREADS", holding some documentation.
52 *
53 * -- Peter Stamfest <peter@stamfest.at>
54 *
55 * Revision 1.6 2002/02/01 20:34:49 oetiker
56 * fixed version number and date/time
57 *
58 * Revision 1.5 2001/05/09 05:31:01 oetiker
59 * Bug fix: when update of multiple PDP/CDP RRAs coincided
60 * with interpolation of multiple PDPs an incorrect value was
61 * stored as the CDP. Especially evident for GAUGE data sources.
62 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
63 *
64 * Revision 1.4 2001/03/10 23:54:41 oetiker
65 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
66 * parser and calculator from rrd_graph and puts then in a new file,
67 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
68 * clean-up of aberrant behavior stuff, including a bug fix.
69 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
70 * -- Jake Brutlag <jakeb@corp.webtv.net>
71 *
72 * Revision 1.3 2001/03/04 13:01:55 oetiker
73 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
74 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
75 * This is backwards compatible! But new files using the Aberrant stuff are not readable
76 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
77 * -- Jake Brutlag <jakeb@corp.webtv.net>
78 *
79 * Revision 1.2 2001/03/04 11:14:25 oetiker
80 * added at-style-time@value:value syntax to rrd_update
81 * -- Dave Bodenstab <imdave@mcs.net>
82 *
83 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
84 * checkin
85 *
86 *****************************************************************************/
88 #include "rrd_tool.h"
89 #include <sys/types.h>
90 #include <fcntl.h>
91 #ifdef HAVE_MMAP
92 #include <sys/mman.h>
93 #endif
95 #ifdef WIN32
96 #include <sys/locking.h>
97 #include <sys/stat.h>
98 #include <io.h>
99 #endif
101 #include "rrd_hw.h"
102 #include "rrd_rpncalc.h"
104 #include "rrd_is_thread_safe.h"
106 #ifdef WIN32
107 /*
108 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
109 * replacement.
110 */
111 #include <sys/timeb.h>
113 struct timeval {
114 time_t tv_sec; /* seconds */
115 long tv_usec; /* microseconds */
116 };
118 struct __timezone {
119 int tz_minuteswest; /* minutes W of Greenwich */
120 int tz_dsttime; /* type of dst correction */
121 };
123 static gettimeofday(struct timeval *t, struct __timezone *tz) {
125 struct timeb current_time;
127 _ftime(¤t_time);
129 t->tv_sec = current_time.time;
130 t->tv_usec = current_time.millitm * 1000;
131 }
133 #endif
134 /*
135 * normilize time as returned by gettimeofday. usec part must
136 * be always >= 0
137 */
138 static void normalize_time(struct timeval *t)
139 {
140 if(t->tv_usec < 0) {
141 t->tv_sec--;
142 t->tv_usec += 1000000L;
143 }
144 }
146 /* Local prototypes */
147 int LockRRD(FILE *rrd_file);
148 #ifdef HAVE_MMAP
149 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
150 unsigned long *rra_current,
151 unsigned short CDP_scratch_idx, FILE *rrd_file,
152 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
153 #else
154 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
155 unsigned long *rra_current,
156 unsigned short CDP_scratch_idx, FILE *rrd_file,
157 info_t *pcdp_summary, time_t *rra_time);
158 #endif
159 int rrd_update_r(char *filename, char *template, int argc, char **argv);
160 int _rrd_update(char *filename, char *template, int argc, char **argv,
161 info_t*);
163 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
166 #ifdef STANDALONE
167 int
168 main(int argc, char **argv){
169 rrd_update(argc,argv);
170 if (rrd_test_error()) {
171 printf("RRDtool 1.1.x Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
172 "Usage: rrdupdate filename\n"
173 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
174 "\t\t\ttime|N:value[:value...]\n\n"
175 "\t\t\tat-time@value[:value...]\n\n"
176 "\t\t\t[ time:value[:value...] ..]\n\n");
178 printf("ERROR: %s\n",rrd_get_error());
179 rrd_clear_error();
180 return 1;
181 }
182 return 0;
183 }
184 #endif
186 info_t *rrd_update_v(int argc, char **argv)
187 {
188 char *template = NULL;
189 info_t *result = NULL;
190 infoval rc;
192 while (1) {
193 static struct option long_options[] =
194 {
195 {"template", required_argument, 0, 't'},
196 {0,0,0,0}
197 };
198 int option_index = 0;
199 int opt;
200 opt = getopt_long(argc, argv, "t:",
201 long_options, &option_index);
203 if (opt == EOF)
204 break;
206 switch(opt) {
207 case 't':
208 template = optarg;
209 break;
211 case '?':
212 rrd_set_error("unknown option '%s'",argv[optind-1]);
213 rc.u_int = -1;
214 goto end_tag;
215 }
216 }
218 /* need at least 2 arguments: filename, data. */
219 if (argc-optind < 2) {
220 rrd_set_error("Not enough arguments");
221 rc.u_int = -1;
222 goto end_tag;
223 }
224 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
225 rc.u_int = _rrd_update(argv[optind], template,
226 argc - optind - 1, argv + optind + 1, result);
227 result->value.u_int = rc.u_int;
228 end_tag:
229 return result;
230 }
232 int
233 rrd_update(int argc, char **argv)
234 {
235 char *template = NULL;
236 int rc;
238 while (1) {
239 static struct option long_options[] =
240 {
241 {"template", required_argument, 0, 't'},
242 {0,0,0,0}
243 };
244 int option_index = 0;
245 int opt;
246 opt = getopt_long(argc, argv, "t:",
247 long_options, &option_index);
249 if (opt == EOF)
250 break;
252 switch(opt) {
253 case 't':
254 template = optarg;
255 break;
257 case '?':
258 rrd_set_error("unknown option '%s'",argv[optind-1]);
259 return(-1);
260 }
261 }
263 /* need at least 2 arguments: filename, data. */
264 if (argc-optind < 2) {
265 rrd_set_error("Not enough arguments");
267 return -1;
268 }
270 rc = rrd_update_r(argv[optind], template,
271 argc - optind - 1, argv + optind + 1);
272 return rc;
273 }
275 int
276 rrd_update_r(char *filename, char *template, int argc, char **argv)
277 {
278 return _rrd_update(filename, template, argc, argv, NULL);
279 }
281 int
282 _rrd_update(char *filename, char *template, int argc, char **argv,
283 info_t *pcdp_summary)
284 {
286 int arg_i = 2;
287 short j;
288 unsigned long i,ii,iii=1;
290 unsigned long rra_begin; /* byte pointer to the rra
291 * area in the rrd file. this
292 * pointer never changes value */
293 unsigned long rra_start; /* byte pointer to the rra
294 * area in the rrd file. this
295 * pointer changes as each rrd is
296 * processed. */
297 unsigned long rra_current; /* byte pointer to the current write
298 * spot in the rrd file. */
299 unsigned long rra_pos_tmp; /* temporary byte pointer. */
300 double interval,
301 pre_int,post_int; /* interval between this and
302 * the last run */
303 unsigned long proc_pdp_st; /* which pdp_st was the last
304 * to be processed */
305 unsigned long occu_pdp_st; /* when was the pdp_st
306 * before the last update
307 * time */
308 unsigned long proc_pdp_age; /* how old was the data in
309 * the pdp prep area when it
310 * was last updated */
311 unsigned long occu_pdp_age; /* how long ago was the last
312 * pdp_step time */
313 rrd_value_t *pdp_new; /* prepare the incoming data
314 * to be added the the
315 * existing entry */
316 rrd_value_t *pdp_temp; /* prepare the pdp values
317 * to be added the the
318 * cdp values */
320 long *tmpl_idx; /* index representing the settings
321 transported by the template index */
322 unsigned long tmpl_cnt = 2; /* time and data */
324 FILE *rrd_file;
325 rrd_t rrd;
326 time_t current_time;
327 time_t rra_time; /* time of update for a RRA */
328 unsigned long current_time_usec; /* microseconds part of current time */
329 struct timeval tmp_time; /* used for time conversion */
331 char **updvals;
332 int schedule_smooth = 0;
333 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
334 /* a vector of future Holt-Winters seasonal coefs */
335 unsigned long elapsed_pdp_st;
336 /* number of elapsed PDP steps since last update */
337 unsigned long *rra_step_cnt = NULL;
338 /* number of rows to be updated in an RRA for a data
339 * value. */
340 unsigned long start_pdp_offset;
341 /* number of PDP steps since the last update that
342 * are assigned to the first CDP to be generated
343 * since the last update. */
344 unsigned short scratch_idx;
345 /* index into the CDP scratch array */
346 enum cf_en current_cf;
347 /* numeric id of the current consolidation function */
348 rpnstack_t rpnstack; /* used for COMPUTE DS */
349 int version; /* rrd version */
350 char *endptr; /* used in the conversion */
351 #ifdef HAVE_MMAP
352 void *rrd_mmaped_file;
353 unsigned long rrd_filesize;
354 #endif
356 rpnstack_init(&rpnstack);
358 /* need at least 1 arguments: data. */
359 if (argc < 1) {
360 rrd_set_error("Not enough arguments");
361 return -1;
362 }
366 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
367 return -1;
368 }
369 /* initialize time */
370 version = atoi(rrd.stat_head->version);
371 gettimeofday(&tmp_time, 0);
372 normalize_time(&tmp_time);
373 current_time = tmp_time.tv_sec;
374 if(version >= 3) {
375 current_time_usec = tmp_time.tv_usec;
376 }
377 else {
378 current_time_usec = 0;
379 }
381 rra_current = rra_start = rra_begin = ftell(rrd_file);
382 /* This is defined in the ANSI C standard, section 7.9.5.3:
384 When a file is opened with udpate mode ('+' as the second
385 or third character in the ... list of mode argument
386 variables), both input and ouptut may be performed on the
387 associated stream. However, ... input may not be directly
388 followed by output without an intervening call to a file
389 positioning function, unless the input oepration encounters
390 end-of-file. */
391 #ifdef HAVE_MMAP
392 fseek(rrd_file, 0, SEEK_END);
393 rrd_filesize = ftell(rrd_file);
394 fseek(rrd_file, rra_current, SEEK_SET);
395 #else
396 fseek(rrd_file, 0, SEEK_CUR);
397 #endif
400 /* get exclusive lock to whole file.
401 * lock gets removed when we close the file.
402 */
403 if (LockRRD(rrd_file) != 0) {
404 rrd_set_error("could not lock RRD");
405 rrd_free(&rrd);
406 fclose(rrd_file);
407 return(-1);
408 }
410 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
411 rrd_set_error("allocating updvals pointer array");
412 rrd_free(&rrd);
413 fclose(rrd_file);
414 return(-1);
415 }
417 if ((pdp_temp = malloc(sizeof(rrd_value_t)
418 *rrd.stat_head->ds_cnt))==NULL){
419 rrd_set_error("allocating pdp_temp ...");
420 free(updvals);
421 rrd_free(&rrd);
422 fclose(rrd_file);
423 return(-1);
424 }
426 if ((tmpl_idx = malloc(sizeof(unsigned long)
427 *(rrd.stat_head->ds_cnt+1)))==NULL){
428 rrd_set_error("allocating tmpl_idx ...");
429 free(pdp_temp);
430 free(updvals);
431 rrd_free(&rrd);
432 fclose(rrd_file);
433 return(-1);
434 }
435 /* initialize template redirector */
436 /* default config example (assume DS 1 is a CDEF DS)
437 tmpl_idx[0] -> 0; (time)
438 tmpl_idx[1] -> 1; (DS 0)
439 tmpl_idx[2] -> 3; (DS 2)
440 tmpl_idx[3] -> 4; (DS 3) */
441 tmpl_idx[0] = 0; /* time */
442 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
443 {
444 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
445 tmpl_idx[ii++]=i;
446 }
447 tmpl_cnt= ii;
449 if (template) {
450 char *dsname;
451 unsigned int tmpl_len;
452 dsname = template;
453 tmpl_cnt = 1; /* the first entry is the time */
454 tmpl_len = strlen(template);
455 for(i=0;i<=tmpl_len ;i++) {
456 if (template[i] == ':' || template[i] == '\0') {
457 template[i] = '\0';
458 if (tmpl_cnt>rrd.stat_head->ds_cnt){
459 rrd_set_error("Template contains more DS definitions than RRD");
460 free(updvals); free(pdp_temp);
461 free(tmpl_idx); rrd_free(&rrd);
462 fclose(rrd_file); return(-1);
463 }
464 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
465 rrd_set_error("unknown DS name '%s'",dsname);
466 free(updvals); free(pdp_temp);
467 free(tmpl_idx); rrd_free(&rrd);
468 fclose(rrd_file); return(-1);
469 } else {
470 /* the first element is always the time */
471 tmpl_idx[tmpl_cnt-1]++;
472 /* go to the next entry on the template */
473 dsname = &template[i+1];
474 /* fix the damage we did before */
475 if (i<tmpl_len) {
476 template[i]=':';
477 }
479 }
480 }
481 }
482 }
483 if ((pdp_new = malloc(sizeof(rrd_value_t)
484 *rrd.stat_head->ds_cnt))==NULL){
485 rrd_set_error("allocating pdp_new ...");
486 free(updvals);
487 free(pdp_temp);
488 free(tmpl_idx);
489 rrd_free(&rrd);
490 fclose(rrd_file);
491 return(-1);
492 }
494 #ifdef HAVE_MMAP
495 rrd_mmaped_file = mmap(0,
496 rrd_filesize,
497 PROT_READ | PROT_WRITE,
498 MAP_SHARED,
499 fileno(rrd_file),
500 0);
501 if (rrd_mmaped_file == MAP_FAILED) {
502 rrd_set_error("error mmapping file %s", filename);
503 free(updvals);
504 free(pdp_temp);
505 free(tmpl_idx);
506 rrd_free(&rrd);
507 fclose(rrd_file);
508 return(-1);
509 }
510 #endif
511 /* loop through the arguments. */
512 for(arg_i=0; arg_i<argc;arg_i++) {
513 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
514 char *step_start = stepper;
515 char *p;
516 char *parsetime_error = NULL;
517 enum {atstyle, normal} timesyntax;
518 struct rrd_time_value ds_tv;
519 if (stepper == NULL){
520 rrd_set_error("failed duplication argv entry");
521 free(updvals);
522 free(pdp_temp);
523 free(tmpl_idx);
524 rrd_free(&rrd);
525 #ifdef HAVE_MMAP
526 munmap(rrd_mmaped_file, rrd_filesize);
527 #endif
528 fclose(rrd_file);
529 return(-1);
530 }
531 /* initialize all ds input to unknown except the first one
532 which has always got to be set */
533 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
534 strcpy(stepper,argv[arg_i]);
535 updvals[0]=stepper;
536 /* separate all ds elements; first must be examined separately
537 due to alternate time syntax */
538 if ((p=strchr(stepper,'@'))!=NULL) {
539 timesyntax = atstyle;
540 *p = '\0';
541 stepper = p+1;
542 } else if ((p=strchr(stepper,':'))!=NULL) {
543 timesyntax = normal;
544 *p = '\0';
545 stepper = p+1;
546 } else {
547 rrd_set_error("expected timestamp not found in data source from %s:...",
548 argv[arg_i]);
549 free(step_start);
550 break;
551 }
552 ii=1;
553 updvals[tmpl_idx[ii]] = stepper;
554 while (*stepper) {
555 if (*stepper == ':') {
556 *stepper = '\0';
557 ii++;
558 if (ii<tmpl_cnt){
559 updvals[tmpl_idx[ii]] = stepper+1;
560 }
561 }
562 stepper++;
563 }
565 if (ii != tmpl_cnt-1) {
566 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
567 tmpl_cnt-1, ii, argv[arg_i]);
568 free(step_start);
569 break;
570 }
572 /* get the time from the reading ... handle N */
573 if (timesyntax == atstyle) {
574 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
575 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
576 free(step_start);
577 break;
578 }
579 if (ds_tv.type == RELATIVE_TO_END_TIME ||
580 ds_tv.type == RELATIVE_TO_START_TIME) {
581 rrd_set_error("specifying time relative to the 'start' "
582 "or 'end' makes no sense here: %s",
583 updvals[0]);
584 free(step_start);
585 break;
586 }
588 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
589 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
591 } else if (strcmp(updvals[0],"N")==0){
592 gettimeofday(&tmp_time, 0);
593 normalize_time(&tmp_time);
594 current_time = tmp_time.tv_sec;
595 current_time_usec = tmp_time.tv_usec;
596 } else {
597 double tmp;
598 tmp = strtod(updvals[0], 0);
599 current_time = floor(tmp);
600 current_time_usec = (long)((tmp - current_time) * 1000000L);
601 }
602 /* dont do any correction for old version RRDs */
603 if(version < 3)
604 current_time_usec = 0;
606 if(current_time <= rrd.live_head->last_up){
607 rrd_set_error("illegal attempt to update using time %ld when "
608 "last update time is %ld (minimum one second step)",
609 current_time, rrd.live_head->last_up);
610 free(step_start);
611 break;
612 }
615 /* seek to the beginning of the rra's */
616 if (rra_current != rra_begin) {
617 #ifndef HAVE_MMAP
618 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
619 rrd_set_error("seek error in rrd");
620 free(step_start);
621 break;
622 }
623 #endif
624 rra_current = rra_begin;
625 }
626 rra_start = rra_begin;
628 /* when was the current pdp started */
629 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
630 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
632 /* when did the last pdp_st occur */
633 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
634 occu_pdp_st = current_time - occu_pdp_age;
635 /* interval = current_time - rrd.live_head->last_up; */
636 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
638 if (occu_pdp_st > proc_pdp_st){
639 /* OK we passed the pdp_st moment*/
640 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
641 * occurred before the latest
642 * pdp_st moment*/
643 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
644 post_int = occu_pdp_age; /* how much after it */
645 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
646 } else {
647 pre_int = interval;
648 post_int = 0;
649 }
651 #ifdef DEBUG
652 printf(
653 "proc_pdp_age %lu\t"
654 "proc_pdp_st %lu\t"
655 "occu_pfp_age %lu\t"
656 "occu_pdp_st %lu\t"
657 "int %lf\t"
658 "pre_int %lf\t"
659 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
660 occu_pdp_age, occu_pdp_st,
661 interval, pre_int, post_int);
662 #endif
664 /* process the data sources and update the pdp_prep
665 * area accordingly */
666 for(i=0;i<rrd.stat_head->ds_cnt;i++){
667 enum dst_en dst_idx;
668 dst_idx= dst_conv(rrd.ds_def[i].dst);
669 /* NOTE: DST_CDEF should never enter this if block, because
670 * updvals[i+1][0] is initialized to 'U'; unless the caller
671 * accidently specified a value for the DST_CDEF. To handle
672 * this case, an extra check is required. */
673 if((updvals[i+1][0] != 'U') &&
674 (dst_idx != DST_CDEF) &&
675 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
676 double rate = DNAN;
677 /* the data source type defines how to process the data */
678 /* pdp_new contains rate * time ... eg the bytes
679 * transferred during the interval. Doing it this way saves
680 * a lot of math operations */
683 switch(dst_idx){
684 case DST_COUNTER:
685 case DST_DERIVE:
686 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
687 for(ii=0;updvals[i+1][ii] != '\0';ii++){
688 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
689 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
690 break;
691 }
692 }
693 if (rrd_test_error()){
694 break;
695 }
696 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
697 if(dst_idx == DST_COUNTER) {
698 /* simple overflow catcher suggested by Andres Kroonmaa */
699 /* this will fail terribly for non 32 or 64 bit counters ... */
700 /* are there any others in SNMP land ? */
701 if (pdp_new[i] < (double)0.0 )
702 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
703 if (pdp_new[i] < (double)0.0 )
704 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
705 }
706 rate = pdp_new[i] / interval;
707 }
708 else {
709 pdp_new[i]= DNAN;
710 }
711 break;
712 case DST_ABSOLUTE:
713 errno = 0;
714 pdp_new[i] = strtod(updvals[i+1],&endptr);
715 if (errno > 0){
716 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
717 break;
718 };
719 if (endptr[0] != '\0'){
720 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
721 break;
722 }
723 rate = pdp_new[i] / interval;
724 break;
725 case DST_GAUGE:
726 errno = 0;
727 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
728 if (errno > 0){
729 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
730 break;
731 };
732 if (endptr[0] != '\0'){
733 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
734 break;
735 }
736 rate = pdp_new[i] / interval;
737 break;
738 default:
739 rrd_set_error("rrd contains unknown DS type : '%s'",
740 rrd.ds_def[i].dst);
741 break;
742 }
743 /* break out of this for loop if the error string is set */
744 if (rrd_test_error()){
745 break;
746 }
747 /* make sure pdp_temp is neither too large or too small
748 * if any of these occur it becomes unknown ...
749 * sorry folks ... */
750 if ( ! isnan(rate) &&
751 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
752 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
753 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
754 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
755 pdp_new[i] = DNAN;
756 }
757 } else {
758 /* no news is news all the same */
759 pdp_new[i] = DNAN;
760 }
762 /* make a copy of the command line argument for the next run */
763 #ifdef DEBUG
764 fprintf(stderr,
765 "prep ds[%lu]\t"
766 "last_arg '%s'\t"
767 "this_arg '%s'\t"
768 "pdp_new %10.2f\n",
769 i,
770 rrd.pdp_prep[i].last_ds,
771 updvals[i+1], pdp_new[i]);
772 #endif
773 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
774 strncpy(rrd.pdp_prep[i].last_ds,
775 updvals[i+1],LAST_DS_LEN-1);
776 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
777 }
778 }
779 /* break out of the argument parsing loop if the error_string is set */
780 if (rrd_test_error()){
781 free(step_start);
782 break;
783 }
784 /* has a pdp_st moment occurred since the last run ? */
786 if (proc_pdp_st == occu_pdp_st){
787 /* no we have not passed a pdp_st moment. therefore update is simple */
789 for(i=0;i<rrd.stat_head->ds_cnt;i++){
790 if(isnan(pdp_new[i]))
791 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
792 else
793 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
794 #ifdef DEBUG
795 fprintf(stderr,
796 "NO PDP ds[%lu]\t"
797 "value %10.2f\t"
798 "unkn_sec %5lu\n",
799 i,
800 rrd.pdp_prep[i].scratch[PDP_val].u_val,
801 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
802 #endif
803 }
804 } else {
805 /* an pdp_st has occurred. */
807 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
808 * occurred up to the last run.
809 pdp_new[] contains rate*seconds from the latest run.
810 pdp_temp[] will contain the rate for cdp */
812 for(i=0;i<rrd.stat_head->ds_cnt;i++){
813 /* update pdp_prep to the current pdp_st */
814 if(isnan(pdp_new[i]))
815 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
816 else
817 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
818 pdp_new[i]/(double)interval*(double)pre_int;
820 /* if too much of the pdp_prep is unknown we dump it */
821 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
822 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
823 (occu_pdp_st-proc_pdp_st <=
824 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
825 pdp_temp[i] = DNAN;
826 } else {
827 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
828 / (double)( occu_pdp_st
829 - proc_pdp_st
830 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
831 }
833 /* process CDEF data sources; remember each CDEF DS can
834 * only reference other DS with a lower index number */
835 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
836 rpnp_t *rpnp;
837 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
838 /* substitue data values for OP_VARIABLE nodes */
839 for (ii = 0; rpnp[ii].op != OP_END; ii++)
840 {
841 if (rpnp[ii].op == OP_VARIABLE) {
842 rpnp[ii].op = OP_NUMBER;
843 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
844 }
845 }
846 /* run the rpn calculator */
847 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
848 free(rpnp);
849 break; /* exits the data sources pdp_temp loop */
850 }
851 }
853 /* make pdp_prep ready for the next run */
854 if(isnan(pdp_new[i])){
855 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
856 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
857 } else {
858 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
859 rrd.pdp_prep[i].scratch[PDP_val].u_val =
860 pdp_new[i]/(double)interval*(double)post_int;
861 }
863 #ifdef DEBUG
864 fprintf(stderr,
865 "PDP UPD ds[%lu]\t"
866 "pdp_temp %10.2f\t"
867 "new_prep %10.2f\t"
868 "new_unkn_sec %5lu\n",
869 i, pdp_temp[i],
870 rrd.pdp_prep[i].scratch[PDP_val].u_val,
871 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
872 #endif
873 }
875 /* if there were errors during the last loop, bail out here */
876 if (rrd_test_error()){
877 free(step_start);
878 break;
879 }
881 /* compute the number of elapsed pdp_st moments */
882 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
883 #ifdef DEBUG
884 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
885 #endif
886 if (rra_step_cnt == NULL)
887 {
888 rra_step_cnt = (unsigned long *)
889 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
890 }
892 for(i = 0, rra_start = rra_begin;
893 i < rrd.stat_head->rra_cnt;
894 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
895 i++)
896 {
897 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
898 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
899 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
900 if (start_pdp_offset <= elapsed_pdp_st) {
901 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
902 rrd.rra_def[i].pdp_cnt + 1;
903 } else {
904 rra_step_cnt[i] = 0;
905 }
907 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
908 {
909 /* If this is a bulk update, we need to skip ahead in the seasonal
910 * arrays so that they will be correct for the next observed value;
911 * note that for the bulk update itself, no update will occur to
912 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
913 * be set to DNAN. */
914 if (rra_step_cnt[i] > 2)
915 {
916 /* skip update by resetting rra_step_cnt[i],
917 * note that this is not data source specific; this is due
918 * to the bulk update, not a DNAN value for the specific data
919 * source. */
920 rra_step_cnt[i] = 0;
921 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
922 &last_seasonal_coef);
923 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
924 &seasonal_coef);
925 }
927 /* periodically run a smoother for seasonal effects */
928 /* Need to use first cdp parameter buffer to track
929 * burnin (burnin requires a specific smoothing schedule).
930 * The CDP_init_seasonal parameter is really an RRA level,
931 * not a data source within RRA level parameter, but the rra_def
932 * is read only for rrd_update (not flushed to disk). */
933 iii = i*(rrd.stat_head -> ds_cnt);
934 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
935 <= BURNIN_CYCLES)
936 {
937 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
938 > rrd.rra_def[i].row_cnt - 1) {
939 /* mark off one of the burnin cycles */
940 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
941 schedule_smooth = 1;
942 }
943 } else {
944 /* someone has no doubt invented a trick to deal with this
945 * wrap around, but at least this code is clear. */
946 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
947 rrd.rra_ptr[i].cur_row)
948 {
949 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
950 * mapping between PDP and CDP */
951 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
952 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
953 {
954 #ifdef DEBUG
955 fprintf(stderr,
956 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
957 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
958 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
959 #endif
960 schedule_smooth = 1;
961 }
962 } else {
963 /* can't rely on negative numbers because we are working with
964 * unsigned values */
965 /* Don't need modulus here. If we've wrapped more than once, only
966 * one smooth is executed at the end. */
967 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
968 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
969 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
970 {
971 #ifdef DEBUG
972 fprintf(stderr,
973 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
974 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
975 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
976 #endif
977 schedule_smooth = 1;
978 }
979 }
980 }
982 rra_current = ftell(rrd_file);
983 } /* if cf is DEVSEASONAL or SEASONAL */
985 if (rrd_test_error()) break;
987 /* update CDP_PREP areas */
988 /* loop over data soures within each RRA */
989 for(ii = 0;
990 ii < rrd.stat_head->ds_cnt;
991 ii++)
992 {
994 /* iii indexes the CDP prep area for this data source within the RRA */
995 iii=i*rrd.stat_head->ds_cnt+ii;
997 if (rrd.rra_def[i].pdp_cnt > 1) {
999 if (rra_step_cnt[i] > 0) {
1000 /* If we are in this block, as least 1 CDP value will be written to
1001 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1002 * to be written, then the "fill in" value is the CDP_secondary_val
1003 * entry. */
1004 if (isnan(pdp_temp[ii]))
1005 {
1006 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
1007 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1008 } else {
1009 /* CDP_secondary value is the RRA "fill in" value for intermediary
1010 * CDP data entries. No matter the CF, the value is the same because
1011 * the average, max, min, and last of a list of identical values is
1012 * the same, namely, the value itself. */
1013 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
1014 }
1016 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
1017 > rrd.rra_def[i].pdp_cnt*
1018 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
1019 {
1020 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1021 /* initialize carry over */
1022 if (current_cf == CF_AVERAGE) {
1023 if (isnan(pdp_temp[ii])) {
1024 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1025 } else {
1026 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1027 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1028 }
1029 } else {
1030 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1031 }
1032 } else {
1033 rrd_value_t cum_val, cur_val;
1034 switch (current_cf) {
1035 case CF_AVERAGE:
1036 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
1037 cur_val = IFDNAN(pdp_temp[ii],0.0);
1038 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1039 (cum_val + cur_val * start_pdp_offset) /
1040 (rrd.rra_def[i].pdp_cnt
1041 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1042 /* initialize carry over value */
1043 if (isnan(pdp_temp[ii])) {
1044 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1045 } else {
1046 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1047 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1048 }
1049 break;
1050 case CF_MAXIMUM:
1051 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1052 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1053 #ifdef DEBUG
1054 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1055 isnan(pdp_temp[ii])) {
1056 fprintf(stderr,
1057 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1058 i,ii);
1059 exit(-1);
1060 }
1061 #endif
1062 if (cur_val > cum_val)
1063 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1064 else
1065 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1066 /* initialize carry over value */
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1068 break;
1069 case CF_MINIMUM:
1070 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1071 cur_val = IFDNAN(pdp_temp[ii],DINF);
1072 #ifdef DEBUG
1073 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1074 isnan(pdp_temp[ii])) {
1075 fprintf(stderr,
1076 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1077 i,ii);
1078 exit(-1);
1079 }
1080 #endif
1081 if (cur_val < cum_val)
1082 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1083 else
1084 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1085 /* initialize carry over value */
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1087 break;
1088 case CF_LAST:
1089 default:
1090 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1091 /* initialize carry over value */
1092 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1093 break;
1094 }
1095 } /* endif meets xff value requirement for a valid value */
1096 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1097 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1098 if (isnan(pdp_temp[ii]))
1099 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1100 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1101 else
1102 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1103 } else /* rra_step_cnt[i] == 0 */
1104 {
1105 #ifdef DEBUG
1106 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1107 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1108 i,ii);
1109 } else {
1110 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1111 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1112 }
1113 #endif
1114 if (isnan(pdp_temp[ii])) {
1115 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1116 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1117 {
1118 if (current_cf == CF_AVERAGE) {
1119 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1120 elapsed_pdp_st;
1121 } else {
1122 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1123 }
1124 #ifdef DEBUG
1125 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1126 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1127 #endif
1128 } else {
1129 switch (current_cf) {
1130 case CF_AVERAGE:
1131 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1132 elapsed_pdp_st;
1133 break;
1134 case CF_MINIMUM:
1135 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1136 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1137 break;
1138 case CF_MAXIMUM:
1139 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1140 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1141 break;
1142 case CF_LAST:
1143 default:
1144 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1145 break;
1146 }
1147 }
1148 }
1149 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1150 if (elapsed_pdp_st > 2)
1151 {
1152 switch (current_cf) {
1153 case CF_AVERAGE:
1154 default:
1155 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1156 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1157 break;
1158 case CF_SEASONAL:
1159 case CF_DEVSEASONAL:
1160 /* need to update cached seasonal values, so they are consistent
1161 * with the bulk update */
1162 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1163 * CDP_last_deviation are the same. */
1164 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1165 last_seasonal_coef[ii];
1166 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1167 seasonal_coef[ii];
1168 break;
1169 case CF_HWPREDICT:
1170 /* need to update the null_count and last_null_count.
1171 * even do this for non-DNAN pdp_temp because the
1172 * algorithm is not learning from batch updates. */
1173 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1174 elapsed_pdp_st;
1175 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1176 elapsed_pdp_st - 1;
1177 /* fall through */
1178 case CF_DEVPREDICT:
1179 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1180 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1181 break;
1182 case CF_FAILURES:
1183 /* do not count missed bulk values as failures */
1184 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1185 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1186 /* need to reset violations buffer.
1187 * could do this more carefully, but for now, just
1188 * assume a bulk update wipes away all violations. */
1189 erase_violations(&rrd, iii, i);
1190 break;
1191 }
1192 }
1193 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1195 if (rrd_test_error()) break;
1197 } /* endif data sources loop */
1198 } /* end RRA Loop */
1200 /* this loop is only entered if elapsed_pdp_st < 3 */
1201 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1202 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1203 {
1204 for(i = 0, rra_start = rra_begin;
1205 i < rrd.stat_head->rra_cnt;
1206 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1207 i++)
1208 {
1209 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1211 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1212 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1213 {
1214 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1215 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1216 &seasonal_coef);
1217 rra_current = ftell(rrd_file);
1218 }
1219 if (rrd_test_error()) break;
1220 /* loop over data soures within each RRA */
1221 for(ii = 0;
1222 ii < rrd.stat_head->ds_cnt;
1223 ii++)
1224 {
1225 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1226 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1227 scratch_idx, seasonal_coef);
1228 }
1229 } /* end RRA Loop */
1230 if (rrd_test_error()) break;
1231 } /* end elapsed_pdp_st loop */
1233 if (rrd_test_error()) break;
1235 /* Ready to write to disk */
1236 /* Move sequentially through the file, writing one RRA at a time.
1237 * Note this architecture divorces the computation of CDP with
1238 * flushing updated RRA entries to disk. */
1239 for(i = 0, rra_start = rra_begin;
1240 i < rrd.stat_head->rra_cnt;
1241 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1242 i++) {
1243 /* is there anything to write for this RRA? If not, continue. */
1244 if (rra_step_cnt[i] == 0) continue;
1246 /* write the first row */
1247 #ifdef DEBUG
1248 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1249 #endif
1250 rrd.rra_ptr[i].cur_row++;
1251 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1252 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1253 /* positition on the first row */
1254 rra_pos_tmp = rra_start +
1255 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1256 if(rra_pos_tmp != rra_current) {
1257 #ifndef HAVE_MMAP
1258 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1259 rrd_set_error("seek error in rrd");
1260 break;
1261 }
1262 #endif
1263 rra_current = rra_pos_tmp;
1264 }
1266 #ifdef DEBUG
1267 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1268 #endif
1269 scratch_idx = CDP_primary_val;
1270 if (pcdp_summary != NULL)
1271 {
1272 rra_time = (current_time - current_time
1273 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1274 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1275 }
1276 #ifdef HAVE_MMAP
1277 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1278 pcdp_summary, &rra_time, rrd_mmaped_file);
1279 #else
1280 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1281 pcdp_summary, &rra_time);
1282 #endif
1283 if (rrd_test_error()) break;
1285 /* write other rows of the bulk update, if any */
1286 scratch_idx = CDP_secondary_val;
1287 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1288 {
1289 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1290 {
1291 #ifdef DEBUG
1292 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1293 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1294 #endif
1295 /* wrap */
1296 rrd.rra_ptr[i].cur_row = 0;
1297 /* seek back to beginning of current rra */
1298 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1299 {
1300 rrd_set_error("seek error in rrd");
1301 break;
1302 }
1303 #ifdef DEBUG
1304 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1305 #endif
1306 rra_current = rra_start;
1307 }
1308 if (pcdp_summary != NULL)
1309 {
1310 rra_time = (current_time - current_time
1311 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1312 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1313 }
1314 #ifdef HAVE_MMAP
1315 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1316 pcdp_summary, &rra_time, rrd_mmaped_file);
1317 #else
1318 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1319 pcdp_summary, &rra_time);
1320 #endif
1321 }
1323 if (rrd_test_error())
1324 break;
1325 } /* RRA LOOP */
1327 /* break out of the argument parsing loop if error_string is set */
1328 if (rrd_test_error()){
1329 free(step_start);
1330 break;
1331 }
1333 } /* endif a pdp_st has occurred */
1334 rrd.live_head->last_up = current_time;
1335 rrd.live_head->last_up_usec = current_time_usec;
1336 free(step_start);
1337 } /* function argument loop */
1339 if (seasonal_coef != NULL) free(seasonal_coef);
1340 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1341 if (rra_step_cnt != NULL) free(rra_step_cnt);
1342 rpnstack_free(&rpnstack);
1344 #ifdef HAVE_MMAP
1345 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1346 rrd_set_error("error writing(unmapping) file: %s", filename);
1347 }
1348 #endif
1349 /* if we got here and if there is an error and if the file has not been
1350 * written to, then close things up and return. */
1351 if (rrd_test_error()) {
1352 free(updvals);
1353 free(tmpl_idx);
1354 rrd_free(&rrd);
1355 free(pdp_temp);
1356 free(pdp_new);
1357 fclose(rrd_file);
1358 return(-1);
1359 }
1361 /* aargh ... that was tough ... so many loops ... anyway, its done.
1362 * we just need to write back the live header portion now*/
1364 if (fseek(rrd_file, (sizeof(stat_head_t)
1365 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1366 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1367 SEEK_SET) != 0) {
1368 rrd_set_error("seek rrd for live header writeback");
1369 free(updvals);
1370 free(tmpl_idx);
1371 rrd_free(&rrd);
1372 free(pdp_temp);
1373 free(pdp_new);
1374 fclose(rrd_file);
1375 return(-1);
1376 }
1378 if(version >= 3) {
1379 if(fwrite( rrd.live_head,
1380 sizeof(live_head_t), 1, rrd_file) != 1){
1381 rrd_set_error("fwrite live_head to rrd");
1382 free(updvals);
1383 rrd_free(&rrd);
1384 free(tmpl_idx);
1385 free(pdp_temp);
1386 free(pdp_new);
1387 fclose(rrd_file);
1388 return(-1);
1389 }
1390 }
1391 else {
1392 if(fwrite( &rrd.live_head->last_up,
1393 sizeof(time_t), 1, rrd_file) != 1){
1394 rrd_set_error("fwrite live_head to rrd");
1395 free(updvals);
1396 rrd_free(&rrd);
1397 free(tmpl_idx);
1398 free(pdp_temp);
1399 free(pdp_new);
1400 fclose(rrd_file);
1401 return(-1);
1402 }
1403 }
1406 if(fwrite( rrd.pdp_prep,
1407 sizeof(pdp_prep_t),
1408 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1409 rrd_set_error("ftwrite pdp_prep to rrd");
1410 free(updvals);
1411 rrd_free(&rrd);
1412 free(tmpl_idx);
1413 free(pdp_temp);
1414 free(pdp_new);
1415 fclose(rrd_file);
1416 return(-1);
1417 }
1419 if(fwrite( rrd.cdp_prep,
1420 sizeof(cdp_prep_t),
1421 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1422 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1424 rrd_set_error("ftwrite cdp_prep to rrd");
1425 free(updvals);
1426 free(tmpl_idx);
1427 rrd_free(&rrd);
1428 free(pdp_temp);
1429 free(pdp_new);
1430 fclose(rrd_file);
1431 return(-1);
1432 }
1434 if(fwrite( rrd.rra_ptr,
1435 sizeof(rra_ptr_t),
1436 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1437 rrd_set_error("fwrite rra_ptr to rrd");
1438 free(updvals);
1439 free(tmpl_idx);
1440 rrd_free(&rrd);
1441 free(pdp_temp);
1442 free(pdp_new);
1443 fclose(rrd_file);
1444 return(-1);
1445 }
1447 /* OK now close the files and free the memory */
1448 if(fclose(rrd_file) != 0){
1449 rrd_set_error("closing rrd");
1450 free(updvals);
1451 free(tmpl_idx);
1452 rrd_free(&rrd);
1453 free(pdp_temp);
1454 free(pdp_new);
1455 return(-1);
1456 }
1458 /* calling the smoothing code here guarantees at most
1459 * one smoothing operation per rrd_update call. Unfortunately,
1460 * it is possible with bulk updates, or a long-delayed update
1461 * for smoothing to occur off-schedule. This really isn't
1462 * critical except during the burning cycles. */
1463 if (schedule_smooth)
1464 {
1465 #ifndef WIN32
1466 rrd_file = fopen(filename,"r+");
1467 #else
1468 rrd_file = fopen(filename,"rb+");
1469 #endif
1470 rra_start = rra_begin;
1471 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1472 {
1473 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1474 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1475 {
1476 #ifdef DEBUG
1477 fprintf(stderr,"Running smoother for rra %ld\n",i);
1478 #endif
1479 apply_smoother(&rrd,i,rra_start,rrd_file);
1480 if (rrd_test_error())
1481 break;
1482 }
1483 rra_start += rrd.rra_def[i].row_cnt
1484 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1485 }
1486 fclose(rrd_file);
1487 }
1488 rrd_free(&rrd);
1489 free(updvals);
1490 free(tmpl_idx);
1491 free(pdp_new);
1492 free(pdp_temp);
1493 return(0);
1494 }
1496 /*
1497 * get exclusive lock to whole file.
1498 * lock gets removed when we close the file
1499 *
1500 * returns 0 on success
1501 */
1502 int
1503 LockRRD(FILE *rrdfile)
1504 {
1505 int rrd_fd; /* File descriptor for RRD */
1506 int rcstat;
1508 rrd_fd = fileno(rrdfile);
1510 {
1511 #ifndef WIN32
1512 struct flock lock;
1513 lock.l_type = F_WRLCK; /* exclusive write lock */
1514 lock.l_len = 0; /* whole file */
1515 lock.l_start = 0; /* start of file */
1516 lock.l_whence = SEEK_SET; /* end of file */
1518 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1519 #else
1520 struct _stat st;
1522 if ( _fstat( rrd_fd, &st ) == 0 ) {
1523 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1524 } else {
1525 rcstat = -1;
1526 }
1527 #endif
1528 }
1530 return(rcstat);
1531 }
1534 #ifdef HAVE_MMAP
1535 info_t
1536 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1537 unsigned short CDP_scratch_idx, FILE *rrd_file,
1538 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1539 #else
1540 info_t
1541 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1542 unsigned short CDP_scratch_idx, FILE *rrd_file,
1543 info_t *pcdp_summary, time_t *rra_time)
1544 #endif
1545 {
1546 unsigned long ds_idx, cdp_idx;
1547 infoval iv;
1549 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1550 {
1551 /* compute the cdp index */
1552 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1553 #ifdef DEBUG
1554 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1555 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1556 rrd -> rra_def[rra_idx].cf_nam);
1557 #endif
1558 if (pcdp_summary != NULL)
1559 {
1560 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1561 /* append info to the return hash */
1562 pcdp_summary = info_push(pcdp_summary,
1563 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1564 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1565 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1566 RD_I_VAL, iv);
1567 }
1568 #ifdef HAVE_MMAP
1569 memcpy((char *)rrd_mmaped_file + *rra_current,
1570 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1571 sizeof(rrd_value_t));
1572 #else
1573 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1574 sizeof(rrd_value_t),1,rrd_file) != 1)
1575 {
1576 rrd_set_error("writing rrd");
1577 return 0;
1578 }
1579 #endif
1580 *rra_current += sizeof(rrd_value_t);
1581 }
1582 return (pcdp_summary);
1583 }