1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.11 2003/09/02 21:58:35 oetiker
9 * be pickier about what we accept in rrd_update. Complain if things do not work out
10 *
11 * Revision 1.10 2003/04/29 19:14:12 jake
12 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
13 * Also revert accidental addition of -I to aclocal MakeMakefile.
14 *
15 * Revision 1.9 2003/04/25 18:35:08 jake
16 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
17 *
18 * Revision 1.8 2003/03/31 21:22:12 oetiker
19 * enables RRDtool updates with microsecond or in case of windows millisecond
20 * precision. This is needed to reduce time measurement error when archive step
21 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
22 *
23 * Revision 1.7 2003/02/13 07:05:27 oetiker
24 * Find attached the patch I promised to send to you. Please note that there
25 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
26 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
27 * library is identical to librrd, but it contains support code for per-thread
28 * global variables currently used for error information only. This is similar
29 * to how errno per-thread variables are implemented. librrd_th must be linked
30 * alongside of libpthred
31 *
32 * There is also a new file "THREADS", holding some documentation.
33 *
34 * -- Peter Stamfest <peter@stamfest.at>
35 *
36 * Revision 1.6 2002/02/01 20:34:49 oetiker
37 * fixed version number and date/time
38 *
39 * Revision 1.5 2001/05/09 05:31:01 oetiker
40 * Bug fix: when update of multiple PDP/CDP RRAs coincided
41 * with interpolation of multiple PDPs an incorrect value was
42 * stored as the CDP. Especially evident for GAUGE data sources.
43 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
44 *
45 * Revision 1.4 2001/03/10 23:54:41 oetiker
46 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
47 * parser and calculator from rrd_graph and puts then in a new file,
48 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
49 * clean-up of aberrant behavior stuff, including a bug fix.
50 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
51 * -- Jake Brutlag <jakeb@corp.webtv.net>
52 *
53 * Revision 1.3 2001/03/04 13:01:55 oetiker
54 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
55 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
56 * This is backwards compatible! But new files using the Aberrant stuff are not readable
57 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
58 * -- Jake Brutlag <jakeb@corp.webtv.net>
59 *
60 * Revision 1.2 2001/03/04 11:14:25 oetiker
61 * added at-style-time@value:value syntax to rrd_update
62 * -- Dave Bodenstab <imdave@mcs.net>
63 *
64 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
65 * checkin
66 *
67 *****************************************************************************/
69 #include "rrd_tool.h"
70 #include <sys/types.h>
71 #include <fcntl.h>
73 #ifdef WIN32
74 #include <sys/locking.h>
75 #include <sys/stat.h>
76 #include <io.h>
77 #endif
79 #include "rrd_hw.h"
80 #include "rrd_rpncalc.h"
82 #include "rrd_is_thread_safe.h"
84 #ifdef WIN32
85 /*
86 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
87 * replacement.
88 */
89 #include <sys/timeb.h>
91 struct timeval {
92 time_t tv_sec; /* seconds */
93 long tv_usec; /* microseconds */
94 };
96 struct __timezone {
97 int tz_minuteswest; /* minutes W of Greenwich */
98 int tz_dsttime; /* type of dst correction */
99 };
101 static gettimeofday(struct timeval *t, struct __timezone *tz) {
103 struct timeb current_time;
105 _ftime(¤t_time);
107 t->tv_sec = current_time.time;
108 t->tv_usec = current_time.millitm * 1000;
109 }
111 #endif
112 /*
113 * normilize time as returned by gettimeofday. usec part must
114 * be always >= 0
115 */
116 static void normalize_time(struct timeval *t)
117 {
118 if(t->tv_usec < 0) {
119 t->tv_sec--;
120 t->tv_usec += 1000000L;
121 }
122 }
124 /* Local prototypes */
125 int LockRRD(FILE *rrd_file);
126 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
127 unsigned long *rra_current,
128 unsigned short CDP_scratch_idx, FILE *rrd_file,
129 info_t *pcdp_summary, time_t *rra_time);
130 int rrd_update_r(char *filename, char *template, int argc, char **argv);
131 int _rrd_update(char *filename, char *template, int argc, char **argv,
132 info_t*);
134 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
137 #ifdef STANDALONE
138 int
139 main(int argc, char **argv){
140 rrd_update(argc,argv);
141 if (rrd_test_error()) {
142 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
143 "Usage: rrdupdate filename\n"
144 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
145 "\t\t\ttime|N:value[:value...]\n\n"
146 "\t\t\tat-time@value[:value...]\n\n"
147 "\t\t\t[ time:value[:value...] ..]\n\n");
149 printf("ERROR: %s\n",rrd_get_error());
150 rrd_clear_error();
151 return 1;
152 }
153 return 0;
154 }
155 #endif
157 info_t *rrd_update_v(int argc, char **argv)
158 {
159 char *template = NULL;
160 info_t *result = NULL;
161 infoval rc;
163 while (1) {
164 static struct option long_options[] =
165 {
166 {"template", required_argument, 0, 't'},
167 {0,0,0,0}
168 };
169 int option_index = 0;
170 int opt;
171 opt = getopt_long(argc, argv, "t:",
172 long_options, &option_index);
174 if (opt == EOF)
175 break;
177 switch(opt) {
178 case 't':
179 template = optarg;
180 break;
182 case '?':
183 rrd_set_error("unknown option '%s'",argv[optind-1]);
184 rc.u_int = -1;
185 goto end_tag;
186 }
187 }
189 /* need at least 2 arguments: filename, data. */
190 if (argc-optind < 2) {
191 rrd_set_error("Not enough arguments");
192 rc.u_int = -1;
193 goto end_tag;
194 }
195 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
196 rc.u_int = _rrd_update(argv[optind], template,
197 argc - optind - 1, argv + optind + 1, result);
198 result->value.u_int = rc.u_int;
199 end_tag:
200 return result;
201 }
203 int
204 rrd_update(int argc, char **argv)
205 {
206 char *template = NULL;
207 int rc;
209 while (1) {
210 static struct option long_options[] =
211 {
212 {"template", required_argument, 0, 't'},
213 {0,0,0,0}
214 };
215 int option_index = 0;
216 int opt;
217 opt = getopt_long(argc, argv, "t:",
218 long_options, &option_index);
220 if (opt == EOF)
221 break;
223 switch(opt) {
224 case 't':
225 template = optarg;
226 break;
228 case '?':
229 rrd_set_error("unknown option '%s'",argv[optind-1]);
230 return(-1);
231 }
232 }
234 /* need at least 2 arguments: filename, data. */
235 if (argc-optind < 2) {
236 rrd_set_error("Not enough arguments");
238 return -1;
239 }
241 rc = rrd_update_r(argv[optind], template,
242 argc - optind - 1, argv + optind + 1);
243 return rc;
244 }
246 int
247 rrd_update_r(char *filename, char *template, int argc, char **argv)
248 {
249 return _rrd_update(filename, template, argc, argv, NULL);
250 }
252 int
253 _rrd_update(char *filename, char *template, int argc, char **argv,
254 info_t *pcdp_summary)
255 {
257 int arg_i = 2;
258 short j;
259 unsigned long i,ii,iii=1;
261 unsigned long rra_begin; /* byte pointer to the rra
262 * area in the rrd file. this
263 * pointer never changes value */
264 unsigned long rra_start; /* byte pointer to the rra
265 * area in the rrd file. this
266 * pointer changes as each rrd is
267 * processed. */
268 unsigned long rra_current; /* byte pointer to the current write
269 * spot in the rrd file. */
270 unsigned long rra_pos_tmp; /* temporary byte pointer. */
271 double interval,
272 pre_int,post_int; /* interval between this and
273 * the last run */
274 unsigned long proc_pdp_st; /* which pdp_st was the last
275 * to be processed */
276 unsigned long occu_pdp_st; /* when was the pdp_st
277 * before the last update
278 * time */
279 unsigned long proc_pdp_age; /* how old was the data in
280 * the pdp prep area when it
281 * was last updated */
282 unsigned long occu_pdp_age; /* how long ago was the last
283 * pdp_step time */
284 rrd_value_t *pdp_new; /* prepare the incoming data
285 * to be added the the
286 * existing entry */
287 rrd_value_t *pdp_temp; /* prepare the pdp values
288 * to be added the the
289 * cdp values */
291 long *tmpl_idx; /* index representing the settings
292 transported by the template index */
293 unsigned long tmpl_cnt = 2; /* time and data */
295 FILE *rrd_file;
296 rrd_t rrd;
297 time_t current_time;
298 time_t rra_time; /* time of update for a RRA */
299 unsigned long current_time_usec; /* microseconds part of current time */
300 struct timeval tmp_time; /* used for time conversion */
302 char **updvals;
303 int schedule_smooth = 0;
304 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
305 /* a vector of future Holt-Winters seasonal coefs */
306 unsigned long elapsed_pdp_st;
307 /* number of elapsed PDP steps since last update */
308 unsigned long *rra_step_cnt = NULL;
309 /* number of rows to be updated in an RRA for a data
310 * value. */
311 unsigned long start_pdp_offset;
312 /* number of PDP steps since the last update that
313 * are assigned to the first CDP to be generated
314 * since the last update. */
315 unsigned short scratch_idx;
316 /* index into the CDP scratch array */
317 enum cf_en current_cf;
318 /* numeric id of the current consolidation function */
319 rpnstack_t rpnstack; /* used for COMPUTE DS */
320 int version; /* rrd version */
321 char *endptr; /* used in the conversion */
323 rpnstack_init(&rpnstack);
325 /* need at least 1 arguments: data. */
326 if (argc < 1) {
327 rrd_set_error("Not enough arguments");
328 return -1;
329 }
333 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
334 return -1;
335 }
336 /* initialize time */
337 version = atoi(rrd.stat_head->version);
338 gettimeofday(&tmp_time, 0);
339 normalize_time(&tmp_time);
340 current_time = tmp_time.tv_sec;
341 if(version >= 3) {
342 current_time_usec = tmp_time.tv_usec;
343 }
344 else {
345 current_time_usec = 0;
346 }
348 rra_current = rra_start = rra_begin = ftell(rrd_file);
349 /* This is defined in the ANSI C standard, section 7.9.5.3:
351 When a file is opened with udpate mode ('+' as the second
352 or third character in the ... list of mode argument
353 variables), both input and ouptut may be performed on the
354 associated stream. However, ... input may not be directly
355 followed by output without an intervening call to a file
356 positioning function, unless the input oepration encounters
357 end-of-file. */
358 fseek(rrd_file, 0, SEEK_CUR);
361 /* get exclusive lock to whole file.
362 * lock gets removed when we close the file.
363 */
364 if (LockRRD(rrd_file) != 0) {
365 rrd_set_error("could not lock RRD");
366 rrd_free(&rrd);
367 fclose(rrd_file);
368 return(-1);
369 }
371 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
372 rrd_set_error("allocating updvals pointer array");
373 rrd_free(&rrd);
374 fclose(rrd_file);
375 return(-1);
376 }
378 if ((pdp_temp = malloc(sizeof(rrd_value_t)
379 *rrd.stat_head->ds_cnt))==NULL){
380 rrd_set_error("allocating pdp_temp ...");
381 free(updvals);
382 rrd_free(&rrd);
383 fclose(rrd_file);
384 return(-1);
385 }
387 if ((tmpl_idx = malloc(sizeof(unsigned long)
388 *(rrd.stat_head->ds_cnt+1)))==NULL){
389 rrd_set_error("allocating tmpl_idx ...");
390 free(pdp_temp);
391 free(updvals);
392 rrd_free(&rrd);
393 fclose(rrd_file);
394 return(-1);
395 }
396 /* initialize template redirector */
397 /* default config example (assume DS 1 is a CDEF DS)
398 tmpl_idx[0] -> 0; (time)
399 tmpl_idx[1] -> 1; (DS 0)
400 tmpl_idx[2] -> 3; (DS 2)
401 tmpl_idx[3] -> 4; (DS 3) */
402 tmpl_idx[0] = 0; /* time */
403 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
404 {
405 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
406 tmpl_idx[ii++]=i;
407 }
408 tmpl_cnt= ii;
410 if (template) {
411 char *dsname;
412 unsigned int tmpl_len;
413 dsname = template;
414 tmpl_cnt = 1; /* the first entry is the time */
415 tmpl_len = strlen(template);
416 for(i=0;i<=tmpl_len ;i++) {
417 if (template[i] == ':' || template[i] == '\0') {
418 template[i] = '\0';
419 if (tmpl_cnt>rrd.stat_head->ds_cnt){
420 rrd_set_error("Template contains more DS definitions than RRD");
421 free(updvals); free(pdp_temp);
422 free(tmpl_idx); rrd_free(&rrd);
423 fclose(rrd_file); return(-1);
424 }
425 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
426 rrd_set_error("unknown DS name '%s'",dsname);
427 free(updvals); free(pdp_temp);
428 free(tmpl_idx); rrd_free(&rrd);
429 fclose(rrd_file); return(-1);
430 } else {
431 /* the first element is always the time */
432 tmpl_idx[tmpl_cnt-1]++;
433 /* go to the next entry on the template */
434 dsname = &template[i+1];
435 /* fix the damage we did before */
436 if (i<tmpl_len) {
437 template[i]=':';
438 }
440 }
441 }
442 }
443 }
444 if ((pdp_new = malloc(sizeof(rrd_value_t)
445 *rrd.stat_head->ds_cnt))==NULL){
446 rrd_set_error("allocating pdp_new ...");
447 free(updvals);
448 free(pdp_temp);
449 free(tmpl_idx);
450 rrd_free(&rrd);
451 fclose(rrd_file);
452 return(-1);
453 }
455 /* loop through the arguments. */
456 for(arg_i=0; arg_i<argc;arg_i++) {
457 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
458 char *step_start = stepper;
459 char *p;
460 char *parsetime_error = NULL;
461 enum {atstyle, normal} timesyntax;
462 struct time_value ds_tv;
463 if (stepper == NULL){
464 rrd_set_error("failed duplication argv entry");
465 free(updvals);
466 free(pdp_temp);
467 free(tmpl_idx);
468 rrd_free(&rrd);
469 fclose(rrd_file);
470 return(-1);
471 }
472 /* initialize all ds input to unknown except the first one
473 which has always got to be set */
474 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
475 strcpy(stepper,argv[arg_i]);
476 updvals[0]=stepper;
477 /* separate all ds elements; first must be examined separately
478 due to alternate time syntax */
479 if ((p=strchr(stepper,'@'))!=NULL) {
480 timesyntax = atstyle;
481 *p = '\0';
482 stepper = p+1;
483 } else if ((p=strchr(stepper,':'))!=NULL) {
484 timesyntax = normal;
485 *p = '\0';
486 stepper = p+1;
487 } else {
488 rrd_set_error("expected timestamp not found in data source from %s:...",
489 argv[arg_i]);
490 free(step_start);
491 break;
492 }
493 ii=1;
494 updvals[tmpl_idx[ii]] = stepper;
495 while (*stepper) {
496 if (*stepper == ':') {
497 *stepper = '\0';
498 ii++;
499 if (ii<tmpl_cnt){
500 updvals[tmpl_idx[ii]] = stepper+1;
501 }
502 }
503 stepper++;
504 }
506 if (ii != tmpl_cnt-1) {
507 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
508 tmpl_cnt-1, ii, argv[arg_i]);
509 free(step_start);
510 break;
511 }
513 /* get the time from the reading ... handle N */
514 if (timesyntax == atstyle) {
515 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
516 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
517 free(step_start);
518 break;
519 }
520 if (ds_tv.type == RELATIVE_TO_END_TIME ||
521 ds_tv.type == RELATIVE_TO_START_TIME) {
522 rrd_set_error("specifying time relative to the 'start' "
523 "or 'end' makes no sense here: %s",
524 updvals[0]);
525 free(step_start);
526 break;
527 }
529 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
530 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
532 } else if (strcmp(updvals[0],"N")==0){
533 gettimeofday(&tmp_time, 0);
534 normalize_time(&tmp_time);
535 current_time = tmp_time.tv_sec;
536 current_time_usec = tmp_time.tv_usec;
537 } else {
538 double tmp;
539 tmp = strtod(updvals[0], 0);
540 current_time = floor(tmp);
541 current_time_usec = (long)((tmp - current_time) * 1000000L);
542 }
543 /* dont do any correction for old version RRDs */
544 if(version < 3)
545 current_time_usec = 0;
547 if(current_time <= rrd.live_head->last_up){
548 rrd_set_error("illegal attempt to update using time %ld when "
549 "last update time is %ld (minimum one second step)",
550 current_time, rrd.live_head->last_up);
551 free(step_start);
552 break;
553 }
556 /* seek to the beginning of the rra's */
557 if (rra_current != rra_begin) {
558 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
559 rrd_set_error("seek error in rrd");
560 free(step_start);
561 break;
562 }
563 rra_current = rra_begin;
564 }
565 rra_start = rra_begin;
567 /* when was the current pdp started */
568 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
571 /* when did the last pdp_st occur */
572 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573 occu_pdp_st = current_time - occu_pdp_age;
574 /* interval = current_time - rrd.live_head->last_up; */
575 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
577 if (occu_pdp_st > proc_pdp_st){
578 /* OK we passed the pdp_st moment*/
579 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
580 * occurred before the latest
581 * pdp_st moment*/
582 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
583 post_int = occu_pdp_age; /* how much after it */
584 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
585 } else {
586 pre_int = interval;
587 post_int = 0;
588 }
590 #ifdef DEBUG
591 printf(
592 "proc_pdp_age %lu\t"
593 "proc_pdp_st %lu\t"
594 "occu_pfp_age %lu\t"
595 "occu_pdp_st %lu\t"
596 "int %lf\t"
597 "pre_int %lf\t"
598 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
599 occu_pdp_age, occu_pdp_st,
600 interval, pre_int, post_int);
601 #endif
603 /* process the data sources and update the pdp_prep
604 * area accordingly */
605 for(i=0;i<rrd.stat_head->ds_cnt;i++){
606 enum dst_en dst_idx;
607 dst_idx= dst_conv(rrd.ds_def[i].dst);
608 /* NOTE: DST_CDEF should never enter this if block, because
609 * updvals[i+1][0] is initialized to 'U'; unless the caller
610 * accidently specified a value for the DST_CDEF. To handle
611 * this case, an extra check is required. */
612 if((updvals[i+1][0] != 'U') &&
613 (dst_idx != DST_CDEF) &&
614 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
615 double rate = DNAN;
616 /* the data source type defines how to process the data */
617 /* pdp_new contains rate * time ... eg the bytes
618 * transferred during the interval. Doing it this way saves
619 * a lot of math operations */
622 switch(dst_idx){
623 case DST_COUNTER:
624 case DST_DERIVE:
625 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
626 for(ii=0;updvals[i+1][ii] != '\0';ii++){
627 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii=0 && updvals[i+1][ii] == '-')){
628 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
629 break;
630 }
631 }
632 if (rrd_test_error()){
633 break;
634 }
635 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
636 if(dst_idx == DST_COUNTER) {
637 /* simple overflow catcher sugestet by andres kroonmaa */
638 /* this will fail terribly for non 32 or 64 bit counters ... */
639 /* are there any others in SNMP land ? */
640 if (pdp_new[i] < (double)0.0 )
641 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
642 if (pdp_new[i] < (double)0.0 )
643 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
644 }
645 rate = pdp_new[i] / interval;
646 }
647 else {
648 pdp_new[i]= DNAN;
649 }
650 break;
651 case DST_ABSOLUTE:
652 errno = 0;
653 pdp_new[i] = strtod(updvals[i+1],&endptr);
654 if (errno > 0){
655 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
656 break;
657 };
658 if (endptr[0] != '\0'){
659 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
660 break;
661 }
662 rate = pdp_new[i] / interval;
663 break;
664 case DST_GAUGE:
665 errno = 0;
666 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
667 if (errno > 0){
668 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
669 break;
670 };
671 if (endptr[0] != '\0'){
672 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
673 break;
674 }
675 rate = pdp_new[i] / interval;
676 break;
677 default:
678 rrd_set_error("rrd contains unknown DS type : '%s'",
679 rrd.ds_def[i].dst);
680 break;
681 }
682 /* break out of this for loop if the error string is set */
683 if (rrd_test_error()){
684 break;
685 }
686 /* make sure pdp_temp is neither too large or too small
687 * if any of these occur it becomes unknown ...
688 * sorry folks ... */
689 if ( ! isnan(rate) &&
690 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
691 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
692 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
693 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
694 pdp_new[i] = DNAN;
695 }
696 } else {
697 /* no news is news all the same */
698 pdp_new[i] = DNAN;
699 }
701 /* make a copy of the command line argument for the next run */
702 #ifdef DEBUG
703 fprintf(stderr,
704 "prep ds[%lu]\t"
705 "last_arg '%s'\t"
706 "this_arg '%s'\t"
707 "pdp_new %10.2f\n",
708 i,
709 rrd.pdp_prep[i].last_ds,
710 updvals[i+1], pdp_new[i]);
711 #endif
712 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
713 strncpy(rrd.pdp_prep[i].last_ds,
714 updvals[i+1],LAST_DS_LEN-1);
715 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
716 }
717 }
718 /* break out of the argument parsing loop if the error_string is set */
719 if (rrd_test_error()){
720 free(step_start);
721 break;
722 }
723 /* has a pdp_st moment occurred since the last run ? */
725 if (proc_pdp_st == occu_pdp_st){
726 /* no we have not passed a pdp_st moment. therefore update is simple */
728 for(i=0;i<rrd.stat_head->ds_cnt;i++){
729 if(isnan(pdp_new[i]))
730 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
731 else
732 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
733 #ifdef DEBUG
734 fprintf(stderr,
735 "NO PDP ds[%lu]\t"
736 "value %10.2f\t"
737 "unkn_sec %5lu\n",
738 i,
739 rrd.pdp_prep[i].scratch[PDP_val].u_val,
740 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
741 #endif
742 }
743 } else {
744 /* an pdp_st has occurred. */
746 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
747 * occurred up to the last run.
748 pdp_new[] contains rate*seconds from the latest run.
749 pdp_temp[] will contain the rate for cdp */
751 for(i=0;i<rrd.stat_head->ds_cnt;i++){
752 /* update pdp_prep to the current pdp_st */
753 if(isnan(pdp_new[i]))
754 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
755 else
756 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
757 pdp_new[i]/(double)interval*(double)pre_int;
759 /* if too much of the pdp_prep is unknown we dump it */
760 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
761 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
762 (occu_pdp_st-proc_pdp_st <=
763 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
764 pdp_temp[i] = DNAN;
765 } else {
766 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
767 / (double)( occu_pdp_st
768 - proc_pdp_st
769 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
770 }
772 /* process CDEF data sources; remember each CDEF DS can
773 * only reference other DS with a lower index number */
774 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
775 rpnp_t *rpnp;
776 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
777 /* substitue data values for OP_VARIABLE nodes */
778 for (ii = 0; rpnp[ii].op != OP_END; ii++)
779 {
780 if (rpnp[ii].op == OP_VARIABLE) {
781 rpnp[ii].op = OP_NUMBER;
782 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
783 }
784 }
785 /* run the rpn calculator */
786 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
787 free(rpnp);
788 break; /* exits the data sources pdp_temp loop */
789 }
790 }
792 /* make pdp_prep ready for the next run */
793 if(isnan(pdp_new[i])){
794 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
795 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
796 } else {
797 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
798 rrd.pdp_prep[i].scratch[PDP_val].u_val =
799 pdp_new[i]/(double)interval*(double)post_int;
800 }
802 #ifdef DEBUG
803 fprintf(stderr,
804 "PDP UPD ds[%lu]\t"
805 "pdp_temp %10.2f\t"
806 "new_prep %10.2f\t"
807 "new_unkn_sec %5lu\n",
808 i, pdp_temp[i],
809 rrd.pdp_prep[i].scratch[PDP_val].u_val,
810 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
811 #endif
812 }
814 /* if there were errors during the last loop, bail out here */
815 if (rrd_test_error()){
816 free(step_start);
817 break;
818 }
820 /* compute the number of elapsed pdp_st moments */
821 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
822 #ifdef DEBUG
823 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
824 #endif
825 if (rra_step_cnt == NULL)
826 {
827 rra_step_cnt = (unsigned long *)
828 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
829 }
831 for(i = 0, rra_start = rra_begin;
832 i < rrd.stat_head->rra_cnt;
833 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
834 i++)
835 {
836 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
837 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
838 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
839 if (start_pdp_offset <= elapsed_pdp_st) {
840 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
841 rrd.rra_def[i].pdp_cnt + 1;
842 } else {
843 rra_step_cnt[i] = 0;
844 }
846 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
847 {
848 /* If this is a bulk update, we need to skip ahead in the seasonal
849 * arrays so that they will be correct for the next observed value;
850 * note that for the bulk update itself, no update will occur to
851 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
852 * be set to DNAN. */
853 if (rra_step_cnt[i] > 2)
854 {
855 /* skip update by resetting rra_step_cnt[i],
856 * note that this is not data source specific; this is due
857 * to the bulk update, not a DNAN value for the specific data
858 * source. */
859 rra_step_cnt[i] = 0;
860 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
861 &last_seasonal_coef);
862 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
863 &seasonal_coef);
864 }
866 /* periodically run a smoother for seasonal effects */
867 /* Need to use first cdp parameter buffer to track
868 * burnin (burnin requires a specific smoothing schedule).
869 * The CDP_init_seasonal parameter is really an RRA level,
870 * not a data source within RRA level parameter, but the rra_def
871 * is read only for rrd_update (not flushed to disk). */
872 iii = i*(rrd.stat_head -> ds_cnt);
873 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
874 <= BURNIN_CYCLES)
875 {
876 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
877 > rrd.rra_def[i].row_cnt - 1) {
878 /* mark off one of the burnin cycles */
879 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
880 schedule_smooth = 1;
881 }
882 } else {
883 /* someone has no doubt invented a trick to deal with this
884 * wrap around, but at least this code is clear. */
885 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
886 rrd.rra_ptr[i].cur_row)
887 {
888 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
889 * mapping between PDP and CDP */
890 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
891 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
892 {
893 #ifdef DEBUG
894 fprintf(stderr,
895 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
896 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
897 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
898 #endif
899 schedule_smooth = 1;
900 }
901 } else {
902 /* can't rely on negative numbers because we are working with
903 * unsigned values */
904 /* Don't need modulus here. If we've wrapped more than once, only
905 * one smooth is executed at the end. */
906 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
907 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
908 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
909 {
910 #ifdef DEBUG
911 fprintf(stderr,
912 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
913 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
914 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
915 #endif
916 schedule_smooth = 1;
917 }
918 }
919 }
921 rra_current = ftell(rrd_file);
922 } /* if cf is DEVSEASONAL or SEASONAL */
924 if (rrd_test_error()) break;
926 /* update CDP_PREP areas */
927 /* loop over data soures within each RRA */
928 for(ii = 0;
929 ii < rrd.stat_head->ds_cnt;
930 ii++)
931 {
933 /* iii indexes the CDP prep area for this data source within the RRA */
934 iii=i*rrd.stat_head->ds_cnt+ii;
936 if (rrd.rra_def[i].pdp_cnt > 1) {
938 if (rra_step_cnt[i] > 0) {
939 /* If we are in this block, as least 1 CDP value will be written to
940 * disk, this is the CDP_primary_val entry. If more than 1 value needs
941 * to be written, then the "fill in" value is the CDP_secondary_val
942 * entry. */
943 if (isnan(pdp_temp[ii]))
944 {
945 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
946 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
947 } else {
948 /* CDP_secondary value is the RRA "fill in" value for intermediary
949 * CDP data entries. No matter the CF, the value is the same because
950 * the average, max, min, and last of a list of identical values is
951 * the same, namely, the value itself. */
952 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
953 }
955 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
956 > rrd.rra_def[i].pdp_cnt*
957 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
958 {
959 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
960 /* initialize carry over */
961 if (current_cf == CF_AVERAGE) {
962 if (isnan(pdp_temp[ii])) {
963 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
964 } else {
965 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
966 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
967 }
968 } else {
969 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
970 }
971 } else {
972 rrd_value_t cum_val, cur_val;
973 switch (current_cf) {
974 case CF_AVERAGE:
975 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
976 cur_val = IFDNAN(pdp_temp[ii],0.0);
977 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
978 (cum_val + cur_val * start_pdp_offset) /
979 (rrd.rra_def[i].pdp_cnt
980 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
981 /* initialize carry over value */
982 if (isnan(pdp_temp[ii])) {
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
984 } else {
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
986 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
987 }
988 break;
989 case CF_MAXIMUM:
990 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
991 cur_val = IFDNAN(pdp_temp[ii],-DINF);
992 #ifdef DEBUG
993 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
994 isnan(pdp_temp[ii])) {
995 fprintf(stderr,
996 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
997 i,ii);
998 exit(-1);
999 }
1000 #endif
1001 if (cur_val > cum_val)
1002 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1003 else
1004 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1005 /* initialize carry over value */
1006 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1007 break;
1008 case CF_MINIMUM:
1009 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1010 cur_val = IFDNAN(pdp_temp[ii],DINF);
1011 #ifdef DEBUG
1012 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1013 isnan(pdp_temp[ii])) {
1014 fprintf(stderr,
1015 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1016 i,ii);
1017 exit(-1);
1018 }
1019 #endif
1020 if (cur_val < cum_val)
1021 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1022 else
1023 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1024 /* initialize carry over value */
1025 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1026 break;
1027 case CF_LAST:
1028 default:
1029 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1030 /* initialize carry over value */
1031 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1032 break;
1033 }
1034 } /* endif meets xff value requirement for a valid value */
1035 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1036 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1037 if (isnan(pdp_temp[ii]))
1038 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1039 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1040 else
1041 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1042 } else /* rra_step_cnt[i] == 0 */
1043 {
1044 #ifdef DEBUG
1045 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1046 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1047 i,ii);
1048 } else {
1049 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1050 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1051 }
1052 #endif
1053 if (isnan(pdp_temp[ii])) {
1054 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1055 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1056 {
1057 if (current_cf == CF_AVERAGE) {
1058 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1059 elapsed_pdp_st;
1060 } else {
1061 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1062 }
1063 #ifdef DEBUG
1064 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1065 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1066 #endif
1067 } else {
1068 switch (current_cf) {
1069 case CF_AVERAGE:
1070 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1071 elapsed_pdp_st;
1072 break;
1073 case CF_MINIMUM:
1074 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1075 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1076 break;
1077 case CF_MAXIMUM:
1078 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1079 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1080 break;
1081 case CF_LAST:
1082 default:
1083 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1084 break;
1085 }
1086 }
1087 }
1088 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1089 if (elapsed_pdp_st > 2)
1090 {
1091 switch (current_cf) {
1092 case CF_AVERAGE:
1093 default:
1094 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1095 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1096 break;
1097 case CF_SEASONAL:
1098 case CF_DEVSEASONAL:
1099 /* need to update cached seasonal values, so they are consistent
1100 * with the bulk update */
1101 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1102 * CDP_last_deviation are the same. */
1103 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1104 last_seasonal_coef[ii];
1105 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1106 seasonal_coef[ii];
1107 break;
1108 case CF_HWPREDICT:
1109 /* need to update the null_count and last_null_count.
1110 * even do this for non-DNAN pdp_temp because the
1111 * algorithm is not learning from batch updates. */
1112 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1113 elapsed_pdp_st;
1114 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1115 elapsed_pdp_st - 1;
1116 /* fall through */
1117 case CF_DEVPREDICT:
1118 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1119 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1120 break;
1121 case CF_FAILURES:
1122 /* do not count missed bulk values as failures */
1123 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1124 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1125 /* need to reset violations buffer.
1126 * could do this more carefully, but for now, just
1127 * assume a bulk update wipes away all violations. */
1128 erase_violations(&rrd, iii, i);
1129 break;
1130 }
1131 }
1132 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1134 if (rrd_test_error()) break;
1136 } /* endif data sources loop */
1137 } /* end RRA Loop */
1139 /* this loop is only entered if elapsed_pdp_st < 3 */
1140 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1141 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1142 {
1143 for(i = 0, rra_start = rra_begin;
1144 i < rrd.stat_head->rra_cnt;
1145 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1146 i++)
1147 {
1148 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1150 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1151 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1152 {
1153 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1154 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1155 &seasonal_coef);
1156 rra_current = ftell(rrd_file);
1157 }
1158 if (rrd_test_error()) break;
1159 /* loop over data soures within each RRA */
1160 for(ii = 0;
1161 ii < rrd.stat_head->ds_cnt;
1162 ii++)
1163 {
1164 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1165 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1166 scratch_idx, seasonal_coef);
1167 }
1168 } /* end RRA Loop */
1169 if (rrd_test_error()) break;
1170 } /* end elapsed_pdp_st loop */
1172 if (rrd_test_error()) break;
1174 /* Ready to write to disk */
1175 /* Move sequentially through the file, writing one RRA at a time.
1176 * Note this architecture divorces the computation of CDP with
1177 * flushing updated RRA entries to disk. */
1178 for(i = 0, rra_start = rra_begin;
1179 i < rrd.stat_head->rra_cnt;
1180 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1181 i++) {
1182 /* is there anything to write for this RRA? If not, continue. */
1183 if (rra_step_cnt[i] == 0) continue;
1185 /* write the first row */
1186 #ifdef DEBUG
1187 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1188 #endif
1189 rrd.rra_ptr[i].cur_row++;
1190 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1191 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1192 /* positition on the first row */
1193 rra_pos_tmp = rra_start +
1194 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1195 if(rra_pos_tmp != rra_current) {
1196 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1197 rrd_set_error("seek error in rrd");
1198 break;
1199 }
1200 rra_current = rra_pos_tmp;
1201 }
1203 #ifdef DEBUG
1204 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1205 #endif
1206 scratch_idx = CDP_primary_val;
1207 if (pcdp_summary != NULL)
1208 {
1209 rra_time = (current_time - current_time
1210 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1211 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1212 }
1213 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1214 pcdp_summary, &rra_time);
1215 if (rrd_test_error()) break;
1217 /* write other rows of the bulk update, if any */
1218 scratch_idx = CDP_secondary_val;
1219 for ( ; rra_step_cnt[i] > 1;
1220 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1221 {
1222 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1223 {
1224 #ifdef DEBUG
1225 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1226 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1227 #endif
1228 /* wrap */
1229 rrd.rra_ptr[i].cur_row = 0;
1230 /* seek back to beginning of current rra */
1231 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1232 {
1233 rrd_set_error("seek error in rrd");
1234 break;
1235 }
1236 #ifdef DEBUG
1237 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1238 #endif
1239 rra_current = rra_start;
1240 }
1241 if (pcdp_summary != NULL)
1242 {
1243 rra_time = (current_time - current_time
1244 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1245 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1246 }
1247 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1248 pcdp_summary, &rra_time);
1249 }
1251 if (rrd_test_error())
1252 break;
1253 } /* RRA LOOP */
1255 /* break out of the argument parsing loop if error_string is set */
1256 if (rrd_test_error()){
1257 free(step_start);
1258 break;
1259 }
1261 } /* endif a pdp_st has occurred */
1262 rrd.live_head->last_up = current_time;
1263 rrd.live_head->last_up_usec = current_time_usec;
1264 free(step_start);
1265 } /* function argument loop */
1267 if (seasonal_coef != NULL) free(seasonal_coef);
1268 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1269 if (rra_step_cnt != NULL) free(rra_step_cnt);
1270 rpnstack_free(&rpnstack);
1272 /* if we got here and if there is an error and if the file has not been
1273 * written to, then close things up and return. */
1274 if (rrd_test_error()) {
1275 free(updvals);
1276 free(tmpl_idx);
1277 rrd_free(&rrd);
1278 free(pdp_temp);
1279 free(pdp_new);
1280 fclose(rrd_file);
1281 return(-1);
1282 }
1284 /* aargh ... that was tough ... so many loops ... anyway, its done.
1285 * we just need to write back the live header portion now*/
1287 if (fseek(rrd_file, (sizeof(stat_head_t)
1288 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1289 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1290 SEEK_SET) != 0) {
1291 rrd_set_error("seek rrd for live header writeback");
1292 free(updvals);
1293 free(tmpl_idx);
1294 rrd_free(&rrd);
1295 free(pdp_temp);
1296 free(pdp_new);
1297 fclose(rrd_file);
1298 return(-1);
1299 }
1301 if(version >= 3) {
1302 if(fwrite( rrd.live_head,
1303 sizeof(live_head_t), 1, rrd_file) != 1){
1304 rrd_set_error("fwrite live_head to rrd");
1305 free(updvals);
1306 rrd_free(&rrd);
1307 free(tmpl_idx);
1308 free(pdp_temp);
1309 free(pdp_new);
1310 fclose(rrd_file);
1311 return(-1);
1312 }
1313 }
1314 else {
1315 if(fwrite( &rrd.live_head->last_up,
1316 sizeof(time_t), 1, rrd_file) != 1){
1317 rrd_set_error("fwrite live_head to rrd");
1318 free(updvals);
1319 rrd_free(&rrd);
1320 free(tmpl_idx);
1321 free(pdp_temp);
1322 free(pdp_new);
1323 fclose(rrd_file);
1324 return(-1);
1325 }
1326 }
1329 if(fwrite( rrd.pdp_prep,
1330 sizeof(pdp_prep_t),
1331 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1332 rrd_set_error("ftwrite pdp_prep to rrd");
1333 free(updvals);
1334 rrd_free(&rrd);
1335 free(tmpl_idx);
1336 free(pdp_temp);
1337 free(pdp_new);
1338 fclose(rrd_file);
1339 return(-1);
1340 }
1342 if(fwrite( rrd.cdp_prep,
1343 sizeof(cdp_prep_t),
1344 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1345 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1347 rrd_set_error("ftwrite cdp_prep to rrd");
1348 free(updvals);
1349 free(tmpl_idx);
1350 rrd_free(&rrd);
1351 free(pdp_temp);
1352 free(pdp_new);
1353 fclose(rrd_file);
1354 return(-1);
1355 }
1357 if(fwrite( rrd.rra_ptr,
1358 sizeof(rra_ptr_t),
1359 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1360 rrd_set_error("fwrite rra_ptr to rrd");
1361 free(updvals);
1362 free(tmpl_idx);
1363 rrd_free(&rrd);
1364 free(pdp_temp);
1365 free(pdp_new);
1366 fclose(rrd_file);
1367 return(-1);
1368 }
1370 /* OK now close the files and free the memory */
1371 if(fclose(rrd_file) != 0){
1372 rrd_set_error("closing rrd");
1373 free(updvals);
1374 free(tmpl_idx);
1375 rrd_free(&rrd);
1376 free(pdp_temp);
1377 free(pdp_new);
1378 return(-1);
1379 }
1381 /* calling the smoothing code here guarantees at most
1382 * one smoothing operation per rrd_update call. Unfortunately,
1383 * it is possible with bulk updates, or a long-delayed update
1384 * for smoothing to occur off-schedule. This really isn't
1385 * critical except during the burning cycles. */
1386 if (schedule_smooth)
1387 {
1388 #ifndef WIN32
1389 rrd_file = fopen(filename,"r+");
1390 #else
1391 rrd_file = fopen(filename,"rb+");
1392 #endif
1393 rra_start = rra_begin;
1394 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1395 {
1396 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1397 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1398 {
1399 #ifdef DEBUG
1400 fprintf(stderr,"Running smoother for rra %ld\n",i);
1401 #endif
1402 apply_smoother(&rrd,i,rra_start,rrd_file);
1403 if (rrd_test_error())
1404 break;
1405 }
1406 rra_start += rrd.rra_def[i].row_cnt
1407 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1408 }
1409 fclose(rrd_file);
1410 }
1411 rrd_free(&rrd);
1412 free(updvals);
1413 free(tmpl_idx);
1414 free(pdp_new);
1415 free(pdp_temp);
1416 return(0);
1417 }
1419 /*
1420 * get exclusive lock to whole file.
1421 * lock gets removed when we close the file
1422 *
1423 * returns 0 on success
1424 */
1425 int
1426 LockRRD(FILE *rrdfile)
1427 {
1428 int rrd_fd; /* File descriptor for RRD */
1429 int stat;
1431 rrd_fd = fileno(rrdfile);
1433 {
1434 #ifndef WIN32
1435 struct flock lock;
1436 lock.l_type = F_WRLCK; /* exclusive write lock */
1437 lock.l_len = 0; /* whole file */
1438 lock.l_start = 0; /* start of file */
1439 lock.l_whence = SEEK_SET; /* end of file */
1441 stat = fcntl(rrd_fd, F_SETLK, &lock);
1442 #else
1443 struct _stat st;
1445 if ( _fstat( rrd_fd, &st ) == 0 ) {
1446 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1447 } else {
1448 stat = -1;
1449 }
1450 #endif
1451 }
1453 return(stat);
1454 }
1457 info_t
1458 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1459 unsigned short CDP_scratch_idx, FILE *rrd_file,
1460 info_t *pcdp_summary, time_t *rra_time)
1461 {
1462 unsigned long ds_idx, cdp_idx;
1463 infoval iv;
1465 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1466 {
1467 /* compute the cdp index */
1468 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1469 #ifdef DEBUG
1470 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1471 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1472 rrd -> rra_def[rra_idx].cf_nam);
1473 #endif
1474 if (pcdp_summary != NULL)
1475 {
1476 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1477 /* append info to the return hash */
1478 pcdp_summary = info_push(pcdp_summary,
1479 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1480 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1481 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1482 RD_I_VAL, iv);
1483 }
1484 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1485 sizeof(rrd_value_t),1,rrd_file) != 1)
1486 {
1487 rrd_set_error("writing rrd");
1488 return 0;
1489 }
1490 *rra_current += sizeof(rrd_value_t);
1491 }
1492 return (pcdp_summary);
1493 }