1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.13 2003/11/11 19:38:03 oetiker
9 * rrd files should NOT change size ever ... bulk update code wa buggy.
10 * -- David M. Grimes <dgrimes@navisite.com>
11 *
12 * Revision 1.12 2003/09/04 13:16:12 oetiker
13 * should not assigne but compare ... grrrrr
14 *
15 * Revision 1.11 2003/09/02 21:58:35 oetiker
16 * be pickier about what we accept in rrd_update. Complain if things do not work out
17 *
18 * Revision 1.10 2003/04/29 19:14:12 jake
19 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
20 * Also revert accidental addition of -I to aclocal MakeMakefile.
21 *
22 * Revision 1.9 2003/04/25 18:35:08 jake
23 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
24 *
25 * Revision 1.8 2003/03/31 21:22:12 oetiker
26 * enables RRDtool updates with microsecond or in case of windows millisecond
27 * precision. This is needed to reduce time measurement error when archive step
28 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
29 *
30 * Revision 1.7 2003/02/13 07:05:27 oetiker
31 * Find attached the patch I promised to send to you. Please note that there
32 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
33 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
34 * library is identical to librrd, but it contains support code for per-thread
35 * global variables currently used for error information only. This is similar
36 * to how errno per-thread variables are implemented. librrd_th must be linked
37 * alongside of libpthred
38 *
39 * There is also a new file "THREADS", holding some documentation.
40 *
41 * -- Peter Stamfest <peter@stamfest.at>
42 *
43 * Revision 1.6 2002/02/01 20:34:49 oetiker
44 * fixed version number and date/time
45 *
46 * Revision 1.5 2001/05/09 05:31:01 oetiker
47 * Bug fix: when update of multiple PDP/CDP RRAs coincided
48 * with interpolation of multiple PDPs an incorrect value was
49 * stored as the CDP. Especially evident for GAUGE data sources.
50 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
51 *
52 * Revision 1.4 2001/03/10 23:54:41 oetiker
53 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
54 * parser and calculator from rrd_graph and puts then in a new file,
55 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
56 * clean-up of aberrant behavior stuff, including a bug fix.
57 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
58 * -- Jake Brutlag <jakeb@corp.webtv.net>
59 *
60 * Revision 1.3 2001/03/04 13:01:55 oetiker
61 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
62 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
63 * This is backwards compatible! But new files using the Aberrant stuff are not readable
64 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
65 * -- Jake Brutlag <jakeb@corp.webtv.net>
66 *
67 * Revision 1.2 2001/03/04 11:14:25 oetiker
68 * added at-style-time@value:value syntax to rrd_update
69 * -- Dave Bodenstab <imdave@mcs.net>
70 *
71 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
72 * checkin
73 *
74 *****************************************************************************/
76 #include "rrd_tool.h"
77 #include <sys/types.h>
78 #include <fcntl.h>
80 #ifdef WIN32
81 #include <sys/locking.h>
82 #include <sys/stat.h>
83 #include <io.h>
84 #endif
86 #include "rrd_hw.h"
87 #include "rrd_rpncalc.h"
89 #include "rrd_is_thread_safe.h"
91 #ifdef WIN32
92 /*
93 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
94 * replacement.
95 */
96 #include <sys/timeb.h>
98 struct timeval {
99 time_t tv_sec; /* seconds */
100 long tv_usec; /* microseconds */
101 };
103 struct __timezone {
104 int tz_minuteswest; /* minutes W of Greenwich */
105 int tz_dsttime; /* type of dst correction */
106 };
108 static gettimeofday(struct timeval *t, struct __timezone *tz) {
110 struct timeb current_time;
112 _ftime(¤t_time);
114 t->tv_sec = current_time.time;
115 t->tv_usec = current_time.millitm * 1000;
116 }
118 #endif
119 /*
120 * normilize time as returned by gettimeofday. usec part must
121 * be always >= 0
122 */
123 static void normalize_time(struct timeval *t)
124 {
125 if(t->tv_usec < 0) {
126 t->tv_sec--;
127 t->tv_usec += 1000000L;
128 }
129 }
131 /* Local prototypes */
132 int LockRRD(FILE *rrd_file);
133 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
134 unsigned long *rra_current,
135 unsigned short CDP_scratch_idx, FILE *rrd_file,
136 info_t *pcdp_summary, time_t *rra_time);
137 int rrd_update_r(char *filename, char *template, int argc, char **argv);
138 int _rrd_update(char *filename, char *template, int argc, char **argv,
139 info_t*);
141 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
144 #ifdef STANDALONE
145 int
146 main(int argc, char **argv){
147 rrd_update(argc,argv);
148 if (rrd_test_error()) {
149 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
150 "Usage: rrdupdate filename\n"
151 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
152 "\t\t\ttime|N:value[:value...]\n\n"
153 "\t\t\tat-time@value[:value...]\n\n"
154 "\t\t\t[ time:value[:value...] ..]\n\n");
156 printf("ERROR: %s\n",rrd_get_error());
157 rrd_clear_error();
158 return 1;
159 }
160 return 0;
161 }
162 #endif
164 info_t *rrd_update_v(int argc, char **argv)
165 {
166 char *template = NULL;
167 info_t *result = NULL;
168 infoval rc;
170 while (1) {
171 static struct option long_options[] =
172 {
173 {"template", required_argument, 0, 't'},
174 {0,0,0,0}
175 };
176 int option_index = 0;
177 int opt;
178 opt = getopt_long(argc, argv, "t:",
179 long_options, &option_index);
181 if (opt == EOF)
182 break;
184 switch(opt) {
185 case 't':
186 template = optarg;
187 break;
189 case '?':
190 rrd_set_error("unknown option '%s'",argv[optind-1]);
191 rc.u_int = -1;
192 goto end_tag;
193 }
194 }
196 /* need at least 2 arguments: filename, data. */
197 if (argc-optind < 2) {
198 rrd_set_error("Not enough arguments");
199 rc.u_int = -1;
200 goto end_tag;
201 }
202 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
203 rc.u_int = _rrd_update(argv[optind], template,
204 argc - optind - 1, argv + optind + 1, result);
205 result->value.u_int = rc.u_int;
206 end_tag:
207 return result;
208 }
210 int
211 rrd_update(int argc, char **argv)
212 {
213 char *template = NULL;
214 int rc;
216 while (1) {
217 static struct option long_options[] =
218 {
219 {"template", required_argument, 0, 't'},
220 {0,0,0,0}
221 };
222 int option_index = 0;
223 int opt;
224 opt = getopt_long(argc, argv, "t:",
225 long_options, &option_index);
227 if (opt == EOF)
228 break;
230 switch(opt) {
231 case 't':
232 template = optarg;
233 break;
235 case '?':
236 rrd_set_error("unknown option '%s'",argv[optind-1]);
237 return(-1);
238 }
239 }
241 /* need at least 2 arguments: filename, data. */
242 if (argc-optind < 2) {
243 rrd_set_error("Not enough arguments");
245 return -1;
246 }
248 rc = rrd_update_r(argv[optind], template,
249 argc - optind - 1, argv + optind + 1);
250 return rc;
251 }
253 int
254 rrd_update_r(char *filename, char *template, int argc, char **argv)
255 {
256 return _rrd_update(filename, template, argc, argv, NULL);
257 }
259 int
260 _rrd_update(char *filename, char *template, int argc, char **argv,
261 info_t *pcdp_summary)
262 {
264 int arg_i = 2;
265 short j;
266 unsigned long i,ii,iii=1;
268 unsigned long rra_begin; /* byte pointer to the rra
269 * area in the rrd file. this
270 * pointer never changes value */
271 unsigned long rra_start; /* byte pointer to the rra
272 * area in the rrd file. this
273 * pointer changes as each rrd is
274 * processed. */
275 unsigned long rra_current; /* byte pointer to the current write
276 * spot in the rrd file. */
277 unsigned long rra_pos_tmp; /* temporary byte pointer. */
278 double interval,
279 pre_int,post_int; /* interval between this and
280 * the last run */
281 unsigned long proc_pdp_st; /* which pdp_st was the last
282 * to be processed */
283 unsigned long occu_pdp_st; /* when was the pdp_st
284 * before the last update
285 * time */
286 unsigned long proc_pdp_age; /* how old was the data in
287 * the pdp prep area when it
288 * was last updated */
289 unsigned long occu_pdp_age; /* how long ago was the last
290 * pdp_step time */
291 rrd_value_t *pdp_new; /* prepare the incoming data
292 * to be added the the
293 * existing entry */
294 rrd_value_t *pdp_temp; /* prepare the pdp values
295 * to be added the the
296 * cdp values */
298 long *tmpl_idx; /* index representing the settings
299 transported by the template index */
300 unsigned long tmpl_cnt = 2; /* time and data */
302 FILE *rrd_file;
303 rrd_t rrd;
304 time_t current_time;
305 time_t rra_time; /* time of update for a RRA */
306 unsigned long current_time_usec; /* microseconds part of current time */
307 struct timeval tmp_time; /* used for time conversion */
309 char **updvals;
310 int schedule_smooth = 0;
311 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
312 /* a vector of future Holt-Winters seasonal coefs */
313 unsigned long elapsed_pdp_st;
314 /* number of elapsed PDP steps since last update */
315 unsigned long *rra_step_cnt = NULL;
316 /* number of rows to be updated in an RRA for a data
317 * value. */
318 unsigned long start_pdp_offset;
319 /* number of PDP steps since the last update that
320 * are assigned to the first CDP to be generated
321 * since the last update. */
322 unsigned short scratch_idx;
323 /* index into the CDP scratch array */
324 enum cf_en current_cf;
325 /* numeric id of the current consolidation function */
326 rpnstack_t rpnstack; /* used for COMPUTE DS */
327 int version; /* rrd version */
328 char *endptr; /* used in the conversion */
330 rpnstack_init(&rpnstack);
332 /* need at least 1 arguments: data. */
333 if (argc < 1) {
334 rrd_set_error("Not enough arguments");
335 return -1;
336 }
340 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
341 return -1;
342 }
343 /* initialize time */
344 version = atoi(rrd.stat_head->version);
345 gettimeofday(&tmp_time, 0);
346 normalize_time(&tmp_time);
347 current_time = tmp_time.tv_sec;
348 if(version >= 3) {
349 current_time_usec = tmp_time.tv_usec;
350 }
351 else {
352 current_time_usec = 0;
353 }
355 rra_current = rra_start = rra_begin = ftell(rrd_file);
356 /* This is defined in the ANSI C standard, section 7.9.5.3:
358 When a file is opened with udpate mode ('+' as the second
359 or third character in the ... list of mode argument
360 variables), both input and ouptut may be performed on the
361 associated stream. However, ... input may not be directly
362 followed by output without an intervening call to a file
363 positioning function, unless the input oepration encounters
364 end-of-file. */
365 fseek(rrd_file, 0, SEEK_CUR);
368 /* get exclusive lock to whole file.
369 * lock gets removed when we close the file.
370 */
371 if (LockRRD(rrd_file) != 0) {
372 rrd_set_error("could not lock RRD");
373 rrd_free(&rrd);
374 fclose(rrd_file);
375 return(-1);
376 }
378 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
379 rrd_set_error("allocating updvals pointer array");
380 rrd_free(&rrd);
381 fclose(rrd_file);
382 return(-1);
383 }
385 if ((pdp_temp = malloc(sizeof(rrd_value_t)
386 *rrd.stat_head->ds_cnt))==NULL){
387 rrd_set_error("allocating pdp_temp ...");
388 free(updvals);
389 rrd_free(&rrd);
390 fclose(rrd_file);
391 return(-1);
392 }
394 if ((tmpl_idx = malloc(sizeof(unsigned long)
395 *(rrd.stat_head->ds_cnt+1)))==NULL){
396 rrd_set_error("allocating tmpl_idx ...");
397 free(pdp_temp);
398 free(updvals);
399 rrd_free(&rrd);
400 fclose(rrd_file);
401 return(-1);
402 }
403 /* initialize template redirector */
404 /* default config example (assume DS 1 is a CDEF DS)
405 tmpl_idx[0] -> 0; (time)
406 tmpl_idx[1] -> 1; (DS 0)
407 tmpl_idx[2] -> 3; (DS 2)
408 tmpl_idx[3] -> 4; (DS 3) */
409 tmpl_idx[0] = 0; /* time */
410 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
411 {
412 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
413 tmpl_idx[ii++]=i;
414 }
415 tmpl_cnt= ii;
417 if (template) {
418 char *dsname;
419 unsigned int tmpl_len;
420 dsname = template;
421 tmpl_cnt = 1; /* the first entry is the time */
422 tmpl_len = strlen(template);
423 for(i=0;i<=tmpl_len ;i++) {
424 if (template[i] == ':' || template[i] == '\0') {
425 template[i] = '\0';
426 if (tmpl_cnt>rrd.stat_head->ds_cnt){
427 rrd_set_error("Template contains more DS definitions than RRD");
428 free(updvals); free(pdp_temp);
429 free(tmpl_idx); rrd_free(&rrd);
430 fclose(rrd_file); return(-1);
431 }
432 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
433 rrd_set_error("unknown DS name '%s'",dsname);
434 free(updvals); free(pdp_temp);
435 free(tmpl_idx); rrd_free(&rrd);
436 fclose(rrd_file); return(-1);
437 } else {
438 /* the first element is always the time */
439 tmpl_idx[tmpl_cnt-1]++;
440 /* go to the next entry on the template */
441 dsname = &template[i+1];
442 /* fix the damage we did before */
443 if (i<tmpl_len) {
444 template[i]=':';
445 }
447 }
448 }
449 }
450 }
451 if ((pdp_new = malloc(sizeof(rrd_value_t)
452 *rrd.stat_head->ds_cnt))==NULL){
453 rrd_set_error("allocating pdp_new ...");
454 free(updvals);
455 free(pdp_temp);
456 free(tmpl_idx);
457 rrd_free(&rrd);
458 fclose(rrd_file);
459 return(-1);
460 }
462 /* loop through the arguments. */
463 for(arg_i=0; arg_i<argc;arg_i++) {
464 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
465 char *step_start = stepper;
466 char *p;
467 char *parsetime_error = NULL;
468 enum {atstyle, normal} timesyntax;
469 struct time_value ds_tv;
470 if (stepper == NULL){
471 rrd_set_error("failed duplication argv entry");
472 free(updvals);
473 free(pdp_temp);
474 free(tmpl_idx);
475 rrd_free(&rrd);
476 fclose(rrd_file);
477 return(-1);
478 }
479 /* initialize all ds input to unknown except the first one
480 which has always got to be set */
481 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
482 strcpy(stepper,argv[arg_i]);
483 updvals[0]=stepper;
484 /* separate all ds elements; first must be examined separately
485 due to alternate time syntax */
486 if ((p=strchr(stepper,'@'))!=NULL) {
487 timesyntax = atstyle;
488 *p = '\0';
489 stepper = p+1;
490 } else if ((p=strchr(stepper,':'))!=NULL) {
491 timesyntax = normal;
492 *p = '\0';
493 stepper = p+1;
494 } else {
495 rrd_set_error("expected timestamp not found in data source from %s:...",
496 argv[arg_i]);
497 free(step_start);
498 break;
499 }
500 ii=1;
501 updvals[tmpl_idx[ii]] = stepper;
502 while (*stepper) {
503 if (*stepper == ':') {
504 *stepper = '\0';
505 ii++;
506 if (ii<tmpl_cnt){
507 updvals[tmpl_idx[ii]] = stepper+1;
508 }
509 }
510 stepper++;
511 }
513 if (ii != tmpl_cnt-1) {
514 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
515 tmpl_cnt-1, ii, argv[arg_i]);
516 free(step_start);
517 break;
518 }
520 /* get the time from the reading ... handle N */
521 if (timesyntax == atstyle) {
522 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
523 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
524 free(step_start);
525 break;
526 }
527 if (ds_tv.type == RELATIVE_TO_END_TIME ||
528 ds_tv.type == RELATIVE_TO_START_TIME) {
529 rrd_set_error("specifying time relative to the 'start' "
530 "or 'end' makes no sense here: %s",
531 updvals[0]);
532 free(step_start);
533 break;
534 }
536 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
537 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
539 } else if (strcmp(updvals[0],"N")==0){
540 gettimeofday(&tmp_time, 0);
541 normalize_time(&tmp_time);
542 current_time = tmp_time.tv_sec;
543 current_time_usec = tmp_time.tv_usec;
544 } else {
545 double tmp;
546 tmp = strtod(updvals[0], 0);
547 current_time = floor(tmp);
548 current_time_usec = (long)((tmp - current_time) * 1000000L);
549 }
550 /* dont do any correction for old version RRDs */
551 if(version < 3)
552 current_time_usec = 0;
554 if(current_time <= rrd.live_head->last_up){
555 rrd_set_error("illegal attempt to update using time %ld when "
556 "last update time is %ld (minimum one second step)",
557 current_time, rrd.live_head->last_up);
558 free(step_start);
559 break;
560 }
563 /* seek to the beginning of the rra's */
564 if (rra_current != rra_begin) {
565 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
566 rrd_set_error("seek error in rrd");
567 free(step_start);
568 break;
569 }
570 rra_current = rra_begin;
571 }
572 rra_start = rra_begin;
574 /* when was the current pdp started */
575 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
576 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
578 /* when did the last pdp_st occur */
579 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
580 occu_pdp_st = current_time - occu_pdp_age;
581 /* interval = current_time - rrd.live_head->last_up; */
582 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
584 if (occu_pdp_st > proc_pdp_st){
585 /* OK we passed the pdp_st moment*/
586 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
587 * occurred before the latest
588 * pdp_st moment*/
589 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
590 post_int = occu_pdp_age; /* how much after it */
591 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
592 } else {
593 pre_int = interval;
594 post_int = 0;
595 }
597 #ifdef DEBUG
598 printf(
599 "proc_pdp_age %lu\t"
600 "proc_pdp_st %lu\t"
601 "occu_pfp_age %lu\t"
602 "occu_pdp_st %lu\t"
603 "int %lf\t"
604 "pre_int %lf\t"
605 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
606 occu_pdp_age, occu_pdp_st,
607 interval, pre_int, post_int);
608 #endif
610 /* process the data sources and update the pdp_prep
611 * area accordingly */
612 for(i=0;i<rrd.stat_head->ds_cnt;i++){
613 enum dst_en dst_idx;
614 dst_idx= dst_conv(rrd.ds_def[i].dst);
615 /* NOTE: DST_CDEF should never enter this if block, because
616 * updvals[i+1][0] is initialized to 'U'; unless the caller
617 * accidently specified a value for the DST_CDEF. To handle
618 * this case, an extra check is required. */
619 if((updvals[i+1][0] != 'U') &&
620 (dst_idx != DST_CDEF) &&
621 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
622 double rate = DNAN;
623 /* the data source type defines how to process the data */
624 /* pdp_new contains rate * time ... eg the bytes
625 * transferred during the interval. Doing it this way saves
626 * a lot of math operations */
629 switch(dst_idx){
630 case DST_COUNTER:
631 case DST_DERIVE:
632 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
633 for(ii=0;updvals[i+1][ii] != '\0';ii++){
634 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
635 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
636 break;
637 }
638 }
639 if (rrd_test_error()){
640 break;
641 }
642 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
643 if(dst_idx == DST_COUNTER) {
644 /* simple overflow catcher sugestet by andres kroonmaa */
645 /* this will fail terribly for non 32 or 64 bit counters ... */
646 /* are there any others in SNMP land ? */
647 if (pdp_new[i] < (double)0.0 )
648 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
649 if (pdp_new[i] < (double)0.0 )
650 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
651 }
652 rate = pdp_new[i] / interval;
653 }
654 else {
655 pdp_new[i]= DNAN;
656 }
657 break;
658 case DST_ABSOLUTE:
659 errno = 0;
660 pdp_new[i] = strtod(updvals[i+1],&endptr);
661 if (errno > 0){
662 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
663 break;
664 };
665 if (endptr[0] != '\0'){
666 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
667 break;
668 }
669 rate = pdp_new[i] / interval;
670 break;
671 case DST_GAUGE:
672 errno = 0;
673 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
674 if (errno > 0){
675 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
676 break;
677 };
678 if (endptr[0] != '\0'){
679 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
680 break;
681 }
682 rate = pdp_new[i] / interval;
683 break;
684 default:
685 rrd_set_error("rrd contains unknown DS type : '%s'",
686 rrd.ds_def[i].dst);
687 break;
688 }
689 /* break out of this for loop if the error string is set */
690 if (rrd_test_error()){
691 break;
692 }
693 /* make sure pdp_temp is neither too large or too small
694 * if any of these occur it becomes unknown ...
695 * sorry folks ... */
696 if ( ! isnan(rate) &&
697 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
698 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
699 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
700 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
701 pdp_new[i] = DNAN;
702 }
703 } else {
704 /* no news is news all the same */
705 pdp_new[i] = DNAN;
706 }
708 /* make a copy of the command line argument for the next run */
709 #ifdef DEBUG
710 fprintf(stderr,
711 "prep ds[%lu]\t"
712 "last_arg '%s'\t"
713 "this_arg '%s'\t"
714 "pdp_new %10.2f\n",
715 i,
716 rrd.pdp_prep[i].last_ds,
717 updvals[i+1], pdp_new[i]);
718 #endif
719 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
720 strncpy(rrd.pdp_prep[i].last_ds,
721 updvals[i+1],LAST_DS_LEN-1);
722 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
723 }
724 }
725 /* break out of the argument parsing loop if the error_string is set */
726 if (rrd_test_error()){
727 free(step_start);
728 break;
729 }
730 /* has a pdp_st moment occurred since the last run ? */
732 if (proc_pdp_st == occu_pdp_st){
733 /* no we have not passed a pdp_st moment. therefore update is simple */
735 for(i=0;i<rrd.stat_head->ds_cnt;i++){
736 if(isnan(pdp_new[i]))
737 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
738 else
739 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
740 #ifdef DEBUG
741 fprintf(stderr,
742 "NO PDP ds[%lu]\t"
743 "value %10.2f\t"
744 "unkn_sec %5lu\n",
745 i,
746 rrd.pdp_prep[i].scratch[PDP_val].u_val,
747 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
748 #endif
749 }
750 } else {
751 /* an pdp_st has occurred. */
753 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
754 * occurred up to the last run.
755 pdp_new[] contains rate*seconds from the latest run.
756 pdp_temp[] will contain the rate for cdp */
758 for(i=0;i<rrd.stat_head->ds_cnt;i++){
759 /* update pdp_prep to the current pdp_st */
760 if(isnan(pdp_new[i]))
761 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
762 else
763 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
764 pdp_new[i]/(double)interval*(double)pre_int;
766 /* if too much of the pdp_prep is unknown we dump it */
767 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
768 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
769 (occu_pdp_st-proc_pdp_st <=
770 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
771 pdp_temp[i] = DNAN;
772 } else {
773 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
774 / (double)( occu_pdp_st
775 - proc_pdp_st
776 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
777 }
779 /* process CDEF data sources; remember each CDEF DS can
780 * only reference other DS with a lower index number */
781 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
782 rpnp_t *rpnp;
783 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
784 /* substitue data values for OP_VARIABLE nodes */
785 for (ii = 0; rpnp[ii].op != OP_END; ii++)
786 {
787 if (rpnp[ii].op == OP_VARIABLE) {
788 rpnp[ii].op = OP_NUMBER;
789 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
790 }
791 }
792 /* run the rpn calculator */
793 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
794 free(rpnp);
795 break; /* exits the data sources pdp_temp loop */
796 }
797 }
799 /* make pdp_prep ready for the next run */
800 if(isnan(pdp_new[i])){
801 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
802 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
803 } else {
804 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
805 rrd.pdp_prep[i].scratch[PDP_val].u_val =
806 pdp_new[i]/(double)interval*(double)post_int;
807 }
809 #ifdef DEBUG
810 fprintf(stderr,
811 "PDP UPD ds[%lu]\t"
812 "pdp_temp %10.2f\t"
813 "new_prep %10.2f\t"
814 "new_unkn_sec %5lu\n",
815 i, pdp_temp[i],
816 rrd.pdp_prep[i].scratch[PDP_val].u_val,
817 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
818 #endif
819 }
821 /* if there were errors during the last loop, bail out here */
822 if (rrd_test_error()){
823 free(step_start);
824 break;
825 }
827 /* compute the number of elapsed pdp_st moments */
828 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
829 #ifdef DEBUG
830 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
831 #endif
832 if (rra_step_cnt == NULL)
833 {
834 rra_step_cnt = (unsigned long *)
835 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
836 }
838 for(i = 0, rra_start = rra_begin;
839 i < rrd.stat_head->rra_cnt;
840 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
841 i++)
842 {
843 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
844 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
845 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
846 if (start_pdp_offset <= elapsed_pdp_st) {
847 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
848 rrd.rra_def[i].pdp_cnt + 1;
849 } else {
850 rra_step_cnt[i] = 0;
851 }
853 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
854 {
855 /* If this is a bulk update, we need to skip ahead in the seasonal
856 * arrays so that they will be correct for the next observed value;
857 * note that for the bulk update itself, no update will occur to
858 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
859 * be set to DNAN. */
860 if (rra_step_cnt[i] > 2)
861 {
862 /* skip update by resetting rra_step_cnt[i],
863 * note that this is not data source specific; this is due
864 * to the bulk update, not a DNAN value for the specific data
865 * source. */
866 rra_step_cnt[i] = 0;
867 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
868 &last_seasonal_coef);
869 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
870 &seasonal_coef);
871 }
873 /* periodically run a smoother for seasonal effects */
874 /* Need to use first cdp parameter buffer to track
875 * burnin (burnin requires a specific smoothing schedule).
876 * The CDP_init_seasonal parameter is really an RRA level,
877 * not a data source within RRA level parameter, but the rra_def
878 * is read only for rrd_update (not flushed to disk). */
879 iii = i*(rrd.stat_head -> ds_cnt);
880 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
881 <= BURNIN_CYCLES)
882 {
883 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
884 > rrd.rra_def[i].row_cnt - 1) {
885 /* mark off one of the burnin cycles */
886 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
887 schedule_smooth = 1;
888 }
889 } else {
890 /* someone has no doubt invented a trick to deal with this
891 * wrap around, but at least this code is clear. */
892 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
893 rrd.rra_ptr[i].cur_row)
894 {
895 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
896 * mapping between PDP and CDP */
897 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
898 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
899 {
900 #ifdef DEBUG
901 fprintf(stderr,
902 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
903 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
904 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
905 #endif
906 schedule_smooth = 1;
907 }
908 } else {
909 /* can't rely on negative numbers because we are working with
910 * unsigned values */
911 /* Don't need modulus here. If we've wrapped more than once, only
912 * one smooth is executed at the end. */
913 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
914 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
915 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
916 {
917 #ifdef DEBUG
918 fprintf(stderr,
919 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
920 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
921 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
922 #endif
923 schedule_smooth = 1;
924 }
925 }
926 }
928 rra_current = ftell(rrd_file);
929 } /* if cf is DEVSEASONAL or SEASONAL */
931 if (rrd_test_error()) break;
933 /* update CDP_PREP areas */
934 /* loop over data soures within each RRA */
935 for(ii = 0;
936 ii < rrd.stat_head->ds_cnt;
937 ii++)
938 {
940 /* iii indexes the CDP prep area for this data source within the RRA */
941 iii=i*rrd.stat_head->ds_cnt+ii;
943 if (rrd.rra_def[i].pdp_cnt > 1) {
945 if (rra_step_cnt[i] > 0) {
946 /* If we are in this block, as least 1 CDP value will be written to
947 * disk, this is the CDP_primary_val entry. If more than 1 value needs
948 * to be written, then the "fill in" value is the CDP_secondary_val
949 * entry. */
950 if (isnan(pdp_temp[ii]))
951 {
952 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
953 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
954 } else {
955 /* CDP_secondary value is the RRA "fill in" value for intermediary
956 * CDP data entries. No matter the CF, the value is the same because
957 * the average, max, min, and last of a list of identical values is
958 * the same, namely, the value itself. */
959 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
960 }
962 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
963 > rrd.rra_def[i].pdp_cnt*
964 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
965 {
966 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
967 /* initialize carry over */
968 if (current_cf == CF_AVERAGE) {
969 if (isnan(pdp_temp[ii])) {
970 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
971 } else {
972 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
973 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
974 }
975 } else {
976 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
977 }
978 } else {
979 rrd_value_t cum_val, cur_val;
980 switch (current_cf) {
981 case CF_AVERAGE:
982 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
983 cur_val = IFDNAN(pdp_temp[ii],0.0);
984 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
985 (cum_val + cur_val * start_pdp_offset) /
986 (rrd.rra_def[i].pdp_cnt
987 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
988 /* initialize carry over value */
989 if (isnan(pdp_temp[ii])) {
990 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
991 } else {
992 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
993 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
994 }
995 break;
996 case CF_MAXIMUM:
997 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
998 cur_val = IFDNAN(pdp_temp[ii],-DINF);
999 #ifdef DEBUG
1000 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1001 isnan(pdp_temp[ii])) {
1002 fprintf(stderr,
1003 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1004 i,ii);
1005 exit(-1);
1006 }
1007 #endif
1008 if (cur_val > cum_val)
1009 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1010 else
1011 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1012 /* initialize carry over value */
1013 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1014 break;
1015 case CF_MINIMUM:
1016 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1017 cur_val = IFDNAN(pdp_temp[ii],DINF);
1018 #ifdef DEBUG
1019 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1020 isnan(pdp_temp[ii])) {
1021 fprintf(stderr,
1022 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1023 i,ii);
1024 exit(-1);
1025 }
1026 #endif
1027 if (cur_val < cum_val)
1028 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1029 else
1030 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1031 /* initialize carry over value */
1032 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1033 break;
1034 case CF_LAST:
1035 default:
1036 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1037 /* initialize carry over value */
1038 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1039 break;
1040 }
1041 } /* endif meets xff value requirement for a valid value */
1042 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1043 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1044 if (isnan(pdp_temp[ii]))
1045 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1046 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1047 else
1048 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1049 } else /* rra_step_cnt[i] == 0 */
1050 {
1051 #ifdef DEBUG
1052 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1053 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1054 i,ii);
1055 } else {
1056 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1057 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1058 }
1059 #endif
1060 if (isnan(pdp_temp[ii])) {
1061 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1062 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1063 {
1064 if (current_cf == CF_AVERAGE) {
1065 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1066 elapsed_pdp_st;
1067 } else {
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1069 }
1070 #ifdef DEBUG
1071 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1072 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1073 #endif
1074 } else {
1075 switch (current_cf) {
1076 case CF_AVERAGE:
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1078 elapsed_pdp_st;
1079 break;
1080 case CF_MINIMUM:
1081 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1082 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1083 break;
1084 case CF_MAXIMUM:
1085 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1087 break;
1088 case CF_LAST:
1089 default:
1090 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1091 break;
1092 }
1093 }
1094 }
1095 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1096 if (elapsed_pdp_st > 2)
1097 {
1098 switch (current_cf) {
1099 case CF_AVERAGE:
1100 default:
1101 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1102 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1103 break;
1104 case CF_SEASONAL:
1105 case CF_DEVSEASONAL:
1106 /* need to update cached seasonal values, so they are consistent
1107 * with the bulk update */
1108 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1109 * CDP_last_deviation are the same. */
1110 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1111 last_seasonal_coef[ii];
1112 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1113 seasonal_coef[ii];
1114 break;
1115 case CF_HWPREDICT:
1116 /* need to update the null_count and last_null_count.
1117 * even do this for non-DNAN pdp_temp because the
1118 * algorithm is not learning from batch updates. */
1119 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1120 elapsed_pdp_st;
1121 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1122 elapsed_pdp_st - 1;
1123 /* fall through */
1124 case CF_DEVPREDICT:
1125 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1126 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1127 break;
1128 case CF_FAILURES:
1129 /* do not count missed bulk values as failures */
1130 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1131 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1132 /* need to reset violations buffer.
1133 * could do this more carefully, but for now, just
1134 * assume a bulk update wipes away all violations. */
1135 erase_violations(&rrd, iii, i);
1136 break;
1137 }
1138 }
1139 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1141 if (rrd_test_error()) break;
1143 } /* endif data sources loop */
1144 } /* end RRA Loop */
1146 /* this loop is only entered if elapsed_pdp_st < 3 */
1147 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1148 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1149 {
1150 for(i = 0, rra_start = rra_begin;
1151 i < rrd.stat_head->rra_cnt;
1152 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1153 i++)
1154 {
1155 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1157 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1158 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1159 {
1160 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1161 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1162 &seasonal_coef);
1163 rra_current = ftell(rrd_file);
1164 }
1165 if (rrd_test_error()) break;
1166 /* loop over data soures within each RRA */
1167 for(ii = 0;
1168 ii < rrd.stat_head->ds_cnt;
1169 ii++)
1170 {
1171 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1172 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1173 scratch_idx, seasonal_coef);
1174 }
1175 } /* end RRA Loop */
1176 if (rrd_test_error()) break;
1177 } /* end elapsed_pdp_st loop */
1179 if (rrd_test_error()) break;
1181 /* Ready to write to disk */
1182 /* Move sequentially through the file, writing one RRA at a time.
1183 * Note this architecture divorces the computation of CDP with
1184 * flushing updated RRA entries to disk. */
1185 for(i = 0, rra_start = rra_begin;
1186 i < rrd.stat_head->rra_cnt;
1187 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1188 i++) {
1189 /* is there anything to write for this RRA? If not, continue. */
1190 if (rra_step_cnt[i] == 0) continue;
1192 /* write the first row */
1193 #ifdef DEBUG
1194 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1195 #endif
1196 rrd.rra_ptr[i].cur_row++;
1197 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1198 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1199 /* positition on the first row */
1200 rra_pos_tmp = rra_start +
1201 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1202 if(rra_pos_tmp != rra_current) {
1203 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1204 rrd_set_error("seek error in rrd");
1205 break;
1206 }
1207 rra_current = rra_pos_tmp;
1208 }
1210 #ifdef DEBUG
1211 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1212 #endif
1213 scratch_idx = CDP_primary_val;
1214 if (pcdp_summary != NULL)
1215 {
1216 rra_time = (current_time - current_time
1217 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1218 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1219 }
1220 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1221 pcdp_summary, &rra_time);
1222 if (rrd_test_error()) break;
1224 /* write other rows of the bulk update, if any */
1225 scratch_idx = CDP_secondary_val;
1226 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1227 {
1228 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1229 {
1230 #ifdef DEBUG
1231 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1232 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1233 #endif
1234 /* wrap */
1235 rrd.rra_ptr[i].cur_row = 0;
1236 /* seek back to beginning of current rra */
1237 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1238 {
1239 rrd_set_error("seek error in rrd");
1240 break;
1241 }
1242 #ifdef DEBUG
1243 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1244 #endif
1245 rra_current = rra_start;
1246 }
1247 if (pcdp_summary != NULL)
1248 {
1249 rra_time = (current_time - current_time
1250 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1251 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1252 }
1253 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1254 pcdp_summary, &rra_time);
1255 }
1257 if (rrd_test_error())
1258 break;
1259 } /* RRA LOOP */
1261 /* break out of the argument parsing loop if error_string is set */
1262 if (rrd_test_error()){
1263 free(step_start);
1264 break;
1265 }
1267 } /* endif a pdp_st has occurred */
1268 rrd.live_head->last_up = current_time;
1269 rrd.live_head->last_up_usec = current_time_usec;
1270 free(step_start);
1271 } /* function argument loop */
1273 if (seasonal_coef != NULL) free(seasonal_coef);
1274 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1275 if (rra_step_cnt != NULL) free(rra_step_cnt);
1276 rpnstack_free(&rpnstack);
1278 /* if we got here and if there is an error and if the file has not been
1279 * written to, then close things up and return. */
1280 if (rrd_test_error()) {
1281 free(updvals);
1282 free(tmpl_idx);
1283 rrd_free(&rrd);
1284 free(pdp_temp);
1285 free(pdp_new);
1286 fclose(rrd_file);
1287 return(-1);
1288 }
1290 /* aargh ... that was tough ... so many loops ... anyway, its done.
1291 * we just need to write back the live header portion now*/
1293 if (fseek(rrd_file, (sizeof(stat_head_t)
1294 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1295 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1296 SEEK_SET) != 0) {
1297 rrd_set_error("seek rrd for live header writeback");
1298 free(updvals);
1299 free(tmpl_idx);
1300 rrd_free(&rrd);
1301 free(pdp_temp);
1302 free(pdp_new);
1303 fclose(rrd_file);
1304 return(-1);
1305 }
1307 if(version >= 3) {
1308 if(fwrite( rrd.live_head,
1309 sizeof(live_head_t), 1, rrd_file) != 1){
1310 rrd_set_error("fwrite live_head to rrd");
1311 free(updvals);
1312 rrd_free(&rrd);
1313 free(tmpl_idx);
1314 free(pdp_temp);
1315 free(pdp_new);
1316 fclose(rrd_file);
1317 return(-1);
1318 }
1319 }
1320 else {
1321 if(fwrite( &rrd.live_head->last_up,
1322 sizeof(time_t), 1, rrd_file) != 1){
1323 rrd_set_error("fwrite live_head to rrd");
1324 free(updvals);
1325 rrd_free(&rrd);
1326 free(tmpl_idx);
1327 free(pdp_temp);
1328 free(pdp_new);
1329 fclose(rrd_file);
1330 return(-1);
1331 }
1332 }
1335 if(fwrite( rrd.pdp_prep,
1336 sizeof(pdp_prep_t),
1337 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1338 rrd_set_error("ftwrite pdp_prep to rrd");
1339 free(updvals);
1340 rrd_free(&rrd);
1341 free(tmpl_idx);
1342 free(pdp_temp);
1343 free(pdp_new);
1344 fclose(rrd_file);
1345 return(-1);
1346 }
1348 if(fwrite( rrd.cdp_prep,
1349 sizeof(cdp_prep_t),
1350 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1351 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1353 rrd_set_error("ftwrite cdp_prep to rrd");
1354 free(updvals);
1355 free(tmpl_idx);
1356 rrd_free(&rrd);
1357 free(pdp_temp);
1358 free(pdp_new);
1359 fclose(rrd_file);
1360 return(-1);
1361 }
1363 if(fwrite( rrd.rra_ptr,
1364 sizeof(rra_ptr_t),
1365 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1366 rrd_set_error("fwrite rra_ptr to rrd");
1367 free(updvals);
1368 free(tmpl_idx);
1369 rrd_free(&rrd);
1370 free(pdp_temp);
1371 free(pdp_new);
1372 fclose(rrd_file);
1373 return(-1);
1374 }
1376 /* OK now close the files and free the memory */
1377 if(fclose(rrd_file) != 0){
1378 rrd_set_error("closing rrd");
1379 free(updvals);
1380 free(tmpl_idx);
1381 rrd_free(&rrd);
1382 free(pdp_temp);
1383 free(pdp_new);
1384 return(-1);
1385 }
1387 /* calling the smoothing code here guarantees at most
1388 * one smoothing operation per rrd_update call. Unfortunately,
1389 * it is possible with bulk updates, or a long-delayed update
1390 * for smoothing to occur off-schedule. This really isn't
1391 * critical except during the burning cycles. */
1392 if (schedule_smooth)
1393 {
1394 #ifndef WIN32
1395 rrd_file = fopen(filename,"r+");
1396 #else
1397 rrd_file = fopen(filename,"rb+");
1398 #endif
1399 rra_start = rra_begin;
1400 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1401 {
1402 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1403 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1404 {
1405 #ifdef DEBUG
1406 fprintf(stderr,"Running smoother for rra %ld\n",i);
1407 #endif
1408 apply_smoother(&rrd,i,rra_start,rrd_file);
1409 if (rrd_test_error())
1410 break;
1411 }
1412 rra_start += rrd.rra_def[i].row_cnt
1413 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1414 }
1415 fclose(rrd_file);
1416 }
1417 rrd_free(&rrd);
1418 free(updvals);
1419 free(tmpl_idx);
1420 free(pdp_new);
1421 free(pdp_temp);
1422 return(0);
1423 }
1425 /*
1426 * get exclusive lock to whole file.
1427 * lock gets removed when we close the file
1428 *
1429 * returns 0 on success
1430 */
1431 int
1432 LockRRD(FILE *rrdfile)
1433 {
1434 int rrd_fd; /* File descriptor for RRD */
1435 int stat;
1437 rrd_fd = fileno(rrdfile);
1439 {
1440 #ifndef WIN32
1441 struct flock lock;
1442 lock.l_type = F_WRLCK; /* exclusive write lock */
1443 lock.l_len = 0; /* whole file */
1444 lock.l_start = 0; /* start of file */
1445 lock.l_whence = SEEK_SET; /* end of file */
1447 stat = fcntl(rrd_fd, F_SETLK, &lock);
1448 #else
1449 struct _stat st;
1451 if ( _fstat( rrd_fd, &st ) == 0 ) {
1452 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1453 } else {
1454 stat = -1;
1455 }
1456 #endif
1457 }
1459 return(stat);
1460 }
1463 info_t
1464 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1465 unsigned short CDP_scratch_idx, FILE *rrd_file,
1466 info_t *pcdp_summary, time_t *rra_time)
1467 {
1468 unsigned long ds_idx, cdp_idx;
1469 infoval iv;
1471 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1472 {
1473 /* compute the cdp index */
1474 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1475 #ifdef DEBUG
1476 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1477 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1478 rrd -> rra_def[rra_idx].cf_nam);
1479 #endif
1480 if (pcdp_summary != NULL)
1481 {
1482 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1483 /* append info to the return hash */
1484 pcdp_summary = info_push(pcdp_summary,
1485 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1486 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1487 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1488 RD_I_VAL, iv);
1489 }
1490 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1491 sizeof(rrd_value_t),1,rrd_file) != 1)
1492 {
1493 rrd_set_error("writing rrd");
1494 return 0;
1495 }
1496 *rra_current += sizeof(rrd_value_t);
1497 }
1498 return (pcdp_summary);
1499 }